Merge branch '2.0' into 2.1
This commit is contained in:
@ -162,6 +162,11 @@ data block. The default value is 1 transaction.
|
|||||||
Controls the number of row events that are grouped into a single Avro
|
Controls the number of row events that are grouped into a single Avro
|
||||||
data block. The default value is 1000 row events.
|
data block. The default value is 1000 row events.
|
||||||
|
|
||||||
|
#### `block_size`
|
||||||
|
|
||||||
|
The Avro data block size in bytes. The default is 16 kilobytes. Increase this
|
||||||
|
value if individual events in the binary logs are very large.
|
||||||
|
|
||||||
## Module commands
|
## Module commands
|
||||||
|
|
||||||
Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands.
|
Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands.
|
||||||
|
@ -97,6 +97,7 @@ const char* column_type_to_string(uint8_t type)
|
|||||||
case TABLE_COL_TYPE_GEOMETRY:
|
case TABLE_COL_TYPE_GEOMETRY:
|
||||||
return "GEOMETRY";
|
return "GEOMETRY";
|
||||||
default:
|
default:
|
||||||
|
ss_dassert(false);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
return "UNKNOWN";
|
return "UNKNOWN";
|
||||||
@ -215,7 +216,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
|
|||||||
dest->tm_year = *ptr;
|
dest->tm_year = *ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef USE_OLD_DATETIME
|
|
||||||
/**
|
/**
|
||||||
* @brief Unpack a DATETIME
|
* @brief Unpack a DATETIME
|
||||||
*
|
*
|
||||||
@ -224,8 +224,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
|
|||||||
* @param val Value read from the binary log
|
* @param val Value read from the binary log
|
||||||
* @param dest Pointer where the unpacked value is stored
|
* @param dest Pointer where the unpacked value is stored
|
||||||
*/
|
*/
|
||||||
static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest)
|
static void unpack_datetime(uint8_t *ptr, struct tm *dest)
|
||||||
{
|
{
|
||||||
|
uint64_t val = 0;
|
||||||
|
memcpy(&val, ptr, sizeof(val));
|
||||||
uint32_t second = val - ((val / 100) * 100);
|
uint32_t second = val - ((val / 100) * 100);
|
||||||
val /= 100;
|
val /= 100;
|
||||||
uint32_t minute = val - ((val / 100) * 100);
|
uint32_t minute = val - ((val / 100) * 100);
|
||||||
@ -240,13 +242,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest)
|
|||||||
|
|
||||||
memset(dest, 0, sizeof(struct tm));
|
memset(dest, 0, sizeof(struct tm));
|
||||||
dest->tm_year = year - 1900;
|
dest->tm_year = year - 1900;
|
||||||
dest->tm_mon = month;
|
dest->tm_mon = month - 1;
|
||||||
dest->tm_mday = day;
|
dest->tm_mday = day;
|
||||||
dest->tm_hour = hour;
|
dest->tm_hour = hour;
|
||||||
dest->tm_min = minute;
|
dest->tm_min = minute;
|
||||||
dest->tm_sec = second;
|
dest->tm_sec = second;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unpack a 5 byte reverse byte order value
|
* Unpack a 5 byte reverse byte order value
|
||||||
@ -412,6 +413,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
|
|||||||
return 3 + ((decimals + 1) / 2);
|
return 3 + ((decimals + 1) / 2);
|
||||||
|
|
||||||
case TABLE_COL_TYPE_DATETIME:
|
case TABLE_COL_TYPE_DATETIME:
|
||||||
|
return 8;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_TIMESTAMP:
|
case TABLE_COL_TYPE_TIMESTAMP:
|
||||||
return 4;
|
return 4;
|
||||||
|
|
||||||
@ -447,8 +450,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
|
|||||||
break;
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_DATETIME:
|
case TABLE_COL_TYPE_DATETIME:
|
||||||
// This is not used with MariaDB RBR
|
unpack_datetime(ptr, tm);
|
||||||
//unpack_datetime(ptr, *metadata, tm);
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case TABLE_COL_TYPE_DATETIME2:
|
case TABLE_COL_TYPE_DATETIME2:
|
||||||
@ -467,6 +469,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
|
|||||||
case TABLE_COL_TYPE_TIMESTAMP2:
|
case TABLE_COL_TYPE_TIMESTAMP2:
|
||||||
unpack_timestamp(ptr, *metadata, tm);
|
unpack_timestamp(ptr, *metadata, tm);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
ss_dassert(false);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return temporal_field_size(type, *metadata);
|
return temporal_field_size(type, *metadata);
|
||||||
}
|
}
|
||||||
@ -596,6 +602,10 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
|
|||||||
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
|
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
|
||||||
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
|
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
ss_dassert(false);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return val;
|
return val;
|
||||||
|
@ -184,6 +184,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
|
|||||||
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
|
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
|
||||||
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
|
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
|
||||||
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
|
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
|
||||||
|
{"block_size", MXS_MODULE_PARAM_COUNT, "0"},
|
||||||
{MXS_END_MODULE_PARAMS}
|
{MXS_END_MODULE_PARAMS}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -405,6 +406,7 @@ createInstance(SERVICE *service, char **options)
|
|||||||
inst->row_target = config_get_integer(params, "group_rows");
|
inst->row_target = config_get_integer(params, "group_rows");
|
||||||
inst->trx_target = config_get_integer(params, "group_trx");
|
inst->trx_target = config_get_integer(params, "group_trx");
|
||||||
int first_file = config_get_integer(params, "start_index");
|
int first_file = config_get_integer(params, "start_index");
|
||||||
|
inst->block_size = config_get_integer(params, "block_size");
|
||||||
|
|
||||||
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
|
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
|
||||||
bool err = false;
|
bool err = false;
|
||||||
@ -479,6 +481,10 @@ createInstance(SERVICE *service, char **options)
|
|||||||
{
|
{
|
||||||
first_file = MXS_MAX(1, atoi(value));
|
first_file = MXS_MAX(1, atoi(value));
|
||||||
}
|
}
|
||||||
|
else if (strcmp(options[i], "block_size") == 0)
|
||||||
|
{
|
||||||
|
inst->block_size = atoi(value);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
MXS_WARNING("Unknown router option: '%s'", options[i]);
|
MXS_WARNING("Unknown router option: '%s'", options[i]);
|
||||||
@ -1054,14 +1060,20 @@ void converter_func(void* data)
|
|||||||
while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK)
|
while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK)
|
||||||
{
|
{
|
||||||
uint64_t start_pos = router->current_pos;
|
uint64_t start_pos = router->current_pos;
|
||||||
|
char binlog_name[BINLOG_FNAMELEN + 1];
|
||||||
|
strcpy(binlog_name, router->binlog_name);
|
||||||
|
|
||||||
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
|
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
|
||||||
{
|
{
|
||||||
binlog_end = avro_read_all_events(router);
|
binlog_end = avro_read_all_events(router);
|
||||||
|
|
||||||
if (router->current_pos != start_pos)
|
if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0)
|
||||||
{
|
{
|
||||||
/** We processed some data, reset the conversion task delay */
|
/** We processed some data, reset the conversion task delay */
|
||||||
router->task_delay = 1;
|
router->task_delay = 1;
|
||||||
|
|
||||||
|
/** Update the GTID index */
|
||||||
|
avro_update_index(router);
|
||||||
}
|
}
|
||||||
|
|
||||||
avro_close_binlog(router->binlog_fd);
|
avro_close_binlog(router->binlog_fd);
|
||||||
|
@ -106,7 +106,7 @@ void avro_close_binlog(int fd)
|
|||||||
* @param filepath Path to the created file
|
* @param filepath Path to the created file
|
||||||
* @param json_schema The schema of the table in JSON format
|
* @param json_schema The schema of the table in JSON format
|
||||||
*/
|
*/
|
||||||
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
|
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size)
|
||||||
{
|
{
|
||||||
AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE));
|
AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE));
|
||||||
if (table)
|
if (table)
|
||||||
@ -127,7 +127,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file);
|
rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, &table->avro_file, "null", block_size);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rc)
|
if (rc)
|
||||||
@ -883,12 +883,6 @@ void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
|
|||||||
}
|
}
|
||||||
hashtable_iterator_free(iter);
|
hashtable_iterator_free(iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Update the GTID index */
|
|
||||||
if (flush == AVROROUTER_FLUSH)
|
|
||||||
{
|
|
||||||
avro_update_index(router);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -105,7 +105,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
|
|||||||
|
|
||||||
/** Close the file and open a new one */
|
/** Close the file and open a new one */
|
||||||
hashtable_delete(router->open_tables, table_ident);
|
hashtable_delete(router->open_tables, table_ident);
|
||||||
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema);
|
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, router->block_size);
|
||||||
|
|
||||||
if (avro_table)
|
if (avro_table)
|
||||||
{
|
{
|
||||||
@ -289,14 +289,19 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
* beforehand so we must continue processing them until we reach the end
|
* beforehand so we must continue processing them until we reach the end
|
||||||
* of the event. */
|
* of the event. */
|
||||||
int rows = 0;
|
int rows = 0;
|
||||||
|
|
||||||
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
|
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
|
||||||
{
|
{
|
||||||
/** Add the current GTID and timestamp */
|
/** Add the current GTID and timestamp */
|
||||||
uint8_t *end = ptr + hdr->event_size;
|
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
|
||||||
int event_type = get_event_type(hdr->event_type);
|
int event_type = get_event_type(hdr->event_type);
|
||||||
prepare_record(router, hdr, event_type, &record);
|
prepare_record(router, hdr, event_type, &record);
|
||||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||||
avro_file_writer_append_value(table->avro_file, &record);
|
if (avro_file_writer_append_value(table->avro_file, &record))
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to write value at position %ld: %s",
|
||||||
|
router->current_pos, avro_strerror());
|
||||||
|
}
|
||||||
|
|
||||||
/** Update rows events have the before and after images of the
|
/** Update rows events have the before and after images of the
|
||||||
* affected rows so we'll process them as another record with
|
* affected rows so we'll process them as another record with
|
||||||
@ -305,7 +310,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
|
|||||||
{
|
{
|
||||||
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
|
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
|
||||||
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
|
||||||
avro_file_writer_append_value(table->avro_file, &record);
|
if (avro_file_writer_append_value(table->avro_file, &record))
|
||||||
|
{
|
||||||
|
MXS_ERROR("Failed to write value at position %ld: %s",
|
||||||
|
router->current_pos, avro_strerror());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
rows++;
|
rows++;
|
||||||
@ -501,15 +510,24 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
|||||||
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
|
for (long i = 0; i < map->columns && npresent < ncolumns; i++)
|
||||||
{
|
{
|
||||||
ss_dassert(create->columns == map->columns);
|
ss_dassert(create->columns == map->columns);
|
||||||
avro_value_get_by_name(record, create->column_names[i], &field, NULL);
|
ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
|
||||||
|
ss_dassert(rc == 0);
|
||||||
|
|
||||||
if (bit_is_set(columns_present, ncolumns, i))
|
if (bit_is_set(columns_present, ncolumns, i))
|
||||||
{
|
{
|
||||||
npresent++;
|
npresent++;
|
||||||
if (bit_is_set(null_bitmap, ncolumns, i))
|
if (bit_is_set(null_bitmap, ncolumns, i))
|
||||||
|
{
|
||||||
|
if (column_is_blob(map->column_types[i]))
|
||||||
|
{
|
||||||
|
uint8_t nullvalue = 0;
|
||||||
|
avro_value_set_bytes(&field, &nullvalue, 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
{
|
{
|
||||||
avro_value_set_null(&field);
|
avro_value_set_null(&field);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
else if (column_is_fixed_string(map->column_types[i]))
|
else if (column_is_fixed_string(map->column_types[i]))
|
||||||
{
|
{
|
||||||
/** ENUM and SET are stored as STRING types with the type stored
|
/** ENUM and SET are stored as STRING types with the type stored
|
||||||
@ -597,8 +615,16 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
|
|||||||
uint64_t len = 0;
|
uint64_t len = 0;
|
||||||
memcpy(&len, ptr, bytes);
|
memcpy(&len, ptr, bytes);
|
||||||
ptr += bytes;
|
ptr += bytes;
|
||||||
|
if (len)
|
||||||
|
{
|
||||||
avro_value_set_bytes(&field, ptr, len);
|
avro_value_set_bytes(&field, ptr, len);
|
||||||
ptr += len;
|
ptr += len;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
uint8_t nullvalue = 0;
|
||||||
|
avro_value_set_bytes(&field, &nullvalue, 1);
|
||||||
|
}
|
||||||
ss_dassert(ptr < end);
|
ss_dassert(ptr < end);
|
||||||
}
|
}
|
||||||
else if (column_is_temporal(map->column_types[i]))
|
else if (column_is_temporal(map->column_types[i]))
|
||||||
|
@ -274,6 +274,7 @@ typedef struct avro_instance
|
|||||||
uint64_t row_count; /*< Row events processed */
|
uint64_t row_count; /*< Row events processed */
|
||||||
uint64_t row_target; /*< Minimum about of row events that will trigger
|
uint64_t row_target; /*< Minimum about of row events that will trigger
|
||||||
* a flush of all tables */
|
* a flush of all tables */
|
||||||
|
uint64_t block_size; /**< Avro datablock size */
|
||||||
struct avro_instance *next;
|
struct avro_instance *next;
|
||||||
} AVRO_INSTANCE;
|
} AVRO_INSTANCE;
|
||||||
|
|
||||||
@ -291,7 +292,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
|
|||||||
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
|
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
|
||||||
extern void avro_close_binlog(int fd);
|
extern void avro_close_binlog(int fd);
|
||||||
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
|
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
|
||||||
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema);
|
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, size_t block_size);
|
||||||
extern void avro_table_free(AVRO_TABLE *table);
|
extern void avro_table_free(AVRO_TABLE *table);
|
||||||
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
extern char* json_new_schema_from_table(TABLE_MAP *map);
|
||||||
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
|
||||||
|
Reference in New Issue
Block a user