Merge branch '2.1' into develop

This commit is contained in:
Markus Mäkelä
2017-03-06 11:25:44 +02:00
12 changed files with 177 additions and 105 deletions

View File

@ -119,9 +119,10 @@ This functionality is similar to the [Multi-Master Monitor](MM-Monitor.md)
functionality. The only difference is that the MySQL monitor will also detect functionality. The only difference is that the MySQL monitor will also detect
traditional Master-Slave topologies. traditional Master-Slave topologies.
### `failover` ### `detect_standalone_master`
Failover mode. This feature takes a boolean parameter is disabled by default. Detect standalone master servers. This feature takes a boolean parameter and is
disabled by default. In MaxScale 2.1.0, this parameter was called `failover`.
This parameter is intended to be used with simple, two node master-slave pairs This parameter is intended to be used with simple, two node master-slave pairs
where the failure of the master can be resolved by "promoting" the slave as the where the failure of the master can be resolved by "promoting" the slave as the
@ -130,22 +131,40 @@ new master. Normally this is done by using an external agent of some sort
[MariaDB Replication Manager](https://github.com/tanji/replication-manager) [MariaDB Replication Manager](https://github.com/tanji/replication-manager)
or [MHA](https://code.google.com/p/mysql-master-ha/). or [MHA](https://code.google.com/p/mysql-master-ha/).
The failover mode in mysqlmon is completely passive in the sense that it does When the number of running servers in the cluster drops down to one, MaxScale
not modify the cluster or any servers in it. It labels a slave server as a cannot be absolutely certain whether the last remaining server is a master or a
master server when there is only one running server. Before a failover can be slave. At this point, MaxScale will try to deduce the type of the server by
initiated, the following conditions must have been met: looking at the system variables of the server in question.
By default, MaxScale will only attempt to deduce if the server can be used as a
slave server (controlled by the `detect_stale_slave` parameter). When the
`detect_standalone_master` mode is enabled, MaxScale will also attempt to deduce
whether the server can be used as a master server. This is done by checking that
the server is not in read-only mode and that it is not configured as a slave.
This mode in mysqlmon is completely passive in the sense that it does not modify
the cluster or any of the servers in it. It only labels the last remaining
server in a cluster as the master server.
Before a server is labeled as a standalone master, the following conditions must
have been met:
- Previous attempts to connect to other servers in the cluster have failed,
controlled by the `failcount` parameter
- The monitor has repeatedly failed to connect to the failed servers
- There is only one running server among the monitored servers - There is only one running server among the monitored servers
- @@read_only is not enabled on the last running server
- The value of the `@@read_only` system variable is set to `OFF`
In 2.1.1, the following additional condition was added:
- The last running server is not configured as a slave - The last running server is not configured as a slave
When these conditions are met, the monitor assigns the last remaining server the If the value of the `allow_cluster_recovery` parameter is set to false, the monitor
master status and puts all other servers into maintenance mode. This is done to sets all other servers into maintenance mode. This is done to prevent accidental
prevent accidental use of the failed servers if they came back online. use of the failed servers if they came back online. If the failed servers come
back up, the maintenance mode needs to be manually cleared once replication has
When the failed servers come back up, the maintenance mode needs to be manually been set up.
cleared once replication has been set up.
**Note**: A failover will cause permanent changes in the data of the promoted **Note**: A failover will cause permanent changes in the data of the promoted
server. Only use this feature if you know that the slave servers are capable server. Only use this feature if you know that the slave servers are capable
@ -153,32 +172,33 @@ cleared once replication has been set up.
### `failcount` ### `failcount`
Number of failures that must occur on all failed servers before a failover is Number of failures that must occur on all failed servers before a standalone
initiated. The default value is 5 failures. server is labeled as a master. The default value is 5 failures.
The monitor will attemt to contact all servers once per monitoring cycle. When The monitor will attempt to contact all servers once per monitoring cycle. When
_failover_ mode is enabled, all of the failed servers must fail _failcount_ `detect_standalone_master` is enabled, all of the failed servers must fail
number of connection attemps before a failover is initiated. _failcount_ number of connection attempts before the last server is labeled as
the master.
The formula for calculating the actual number of milliseconds before failover The formula for calculating the actual number of milliseconds before the server
can start is `monitor_interval * failcount`. This means that to trigger a is labeled as the master is `monitor_interval * failcount`.
failover after 10 seconds of master failure with a _monitor_interval_ of 1000
milliseconds, the value of _failcount_ must be 10.
### `failover_recovery` ### `allow_cluster_recovery`
Allow recovery after failover. This feature takes a boolean parameter is Allow recovery after the cluster has dropped down to one server. This feature
enabled by default. takes a boolean parameter is enabled by default. This parameter requires that
`detect_standalone_master` is set to true. In MaxScale 2.1.0, this parameter was
called `failover_recovery`.
When this parameter is disabled, if a failover has been triggered and the last When this parameter is disabled, if the last remaining server is labeled as the
remaining server is chosen as the master, the monitor will set all of the failed master, the monitor will set all of the failed servers into maintenance
servers into maintenance mode. When this option is enabled, the failed servers mode. When this option is enabled, the failed servers are allowed to rejoin the
are allowed to rejoin the cluster. cluster.
This option should be enabled when failover in MaxScale is used in conjunction This option should be enabled only when MaxScale is used in conjunction with an
with an external agent that resets the slave status for new master servers. One external agent that automatically reintegrates failed servers into the
of these agents is the _replication-manager_ which clears the slave cluster. One of these agents is the _replication-manager_ which automatically
configuration for each new master and removes the read-only mode. configures the failed servers as new slaves of the current master.
## Example 1 - Monitor script ## Example 1 - Monitor script

View File

@ -171,6 +171,11 @@ data block. The default value is 1 transaction.
Controls the number of row events that are grouped into a single Avro Controls the number of row events that are grouped into a single Avro
data block. The default value is 1000 row events. data block. The default value is 1000 row events.
#### `block_size`
The Avro data block size in bytes. The default is 16 kilobytes. Increase this
value if individual events in the binary logs are very large.
## Module commands ## Module commands
Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands. Read [Module Commands](../Reference/Module-Commands.md) documentation for details about module commands.

View File

@ -97,6 +97,7 @@ const char* column_type_to_string(uint8_t type)
case TABLE_COL_TYPE_GEOMETRY: case TABLE_COL_TYPE_GEOMETRY:
return "GEOMETRY"; return "GEOMETRY";
default: default:
ss_dassert(false);
break; break;
} }
return "UNKNOWN"; return "UNKNOWN";
@ -215,7 +216,6 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
dest->tm_year = *ptr; dest->tm_year = *ptr;
} }
#ifdef USE_OLD_DATETIME
/** /**
* @brief Unpack a DATETIME * @brief Unpack a DATETIME
* *
@ -224,8 +224,10 @@ static void unpack_year(uint8_t *ptr, struct tm *dest)
* @param val Value read from the binary log * @param val Value read from the binary log
* @param dest Pointer where the unpacked value is stored * @param dest Pointer where the unpacked value is stored
*/ */
static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest) static void unpack_datetime(uint8_t *ptr, struct tm *dest)
{ {
uint64_t val = 0;
memcpy(&val, ptr, sizeof(val));
uint32_t second = val - ((val / 100) * 100); uint32_t second = val - ((val / 100) * 100);
val /= 100; val /= 100;
uint32_t minute = val - ((val / 100) * 100); uint32_t minute = val - ((val / 100) * 100);
@ -240,13 +242,12 @@ static void unpack_datetime(uint8_t *ptr, uint8_t decimals, struct tm *dest)
memset(dest, 0, sizeof(struct tm)); memset(dest, 0, sizeof(struct tm));
dest->tm_year = year - 1900; dest->tm_year = year - 1900;
dest->tm_mon = month; dest->tm_mon = month - 1;
dest->tm_mday = day; dest->tm_mday = day;
dest->tm_hour = hour; dest->tm_hour = hour;
dest->tm_min = minute; dest->tm_min = minute;
dest->tm_sec = second; dest->tm_sec = second;
} }
#endif
/** /**
* Unpack a 5 byte reverse byte order value * Unpack a 5 byte reverse byte order value
@ -412,6 +413,8 @@ static size_t temporal_field_size(uint8_t type, uint8_t decimals)
return 3 + ((decimals + 1) / 2); return 3 + ((decimals + 1) / 2);
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
return 8;
case TABLE_COL_TYPE_TIMESTAMP: case TABLE_COL_TYPE_TIMESTAMP:
return 4; return 4;
@ -447,8 +450,7 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
break; break;
case TABLE_COL_TYPE_DATETIME: case TABLE_COL_TYPE_DATETIME:
// This is not used with MariaDB RBR unpack_datetime(ptr, tm);
//unpack_datetime(ptr, *metadata, tm);
break; break;
case TABLE_COL_TYPE_DATETIME2: case TABLE_COL_TYPE_DATETIME2:
@ -467,6 +469,10 @@ size_t unpack_temporal_value(uint8_t type, uint8_t *ptr, uint8_t *metadata, stru
case TABLE_COL_TYPE_TIMESTAMP2: case TABLE_COL_TYPE_TIMESTAMP2:
unpack_timestamp(ptr, *metadata, tm); unpack_timestamp(ptr, *metadata, tm);
break; break;
default:
ss_dassert(false);
break;
} }
return temporal_field_size(type, *metadata); return temporal_field_size(type, *metadata);
} }
@ -560,42 +566,46 @@ static uint64_t unpack_bytes(uint8_t *ptr, size_t bytes)
switch (bytes) switch (bytes)
{ {
case 1: case 1:
val = ptr[0]; val = ptr[0];
break; break;
case 2: case 2:
val = ptr[1] | ((uint64_t)(ptr[0]) << 8); val = ptr[1] | ((uint64_t)(ptr[0]) << 8);
break; break;
case 3: case 3:
val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) | val = (uint64_t)ptr[2] | ((uint64_t)ptr[1] << 8) |
((uint64_t)ptr[0] << 16); ((uint64_t)ptr[0] << 16);
break; break;
case 4: case 4:
val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) | val = (uint64_t)ptr[3] | ((uint64_t)ptr[2] << 8) |
((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24); ((uint64_t)ptr[1] << 16) | ((uint64_t)ptr[0] << 24);
break; break;
case 5: case 5:
val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) | val = (uint64_t)ptr[4] | ((uint64_t)ptr[3] << 8) |
((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) | ((uint64_t)ptr[2] << 16) | ((uint64_t)ptr[1] << 24) |
((uint64_t)ptr[0] << 32); ((uint64_t)ptr[0] << 32);
break; break;
case 6: case 6:
val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) | val = (uint64_t)ptr[5] | ((uint64_t)ptr[4] << 8) |
((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) | ((uint64_t)ptr[3] << 16) | ((uint64_t)ptr[2] << 24) |
((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40); ((uint64_t)ptr[1] << 32) | ((uint64_t)ptr[0] << 40);
break; break;
case 7: case 7:
val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) | val = (uint64_t)ptr[6] | ((uint64_t)ptr[5] << 8) |
((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) | ((uint64_t)ptr[4] << 16) | ((uint64_t)ptr[3] << 24) |
((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) | ((uint64_t)ptr[2] << 32) | ((uint64_t)ptr[1] << 40) |
((uint64_t)ptr[0] << 48); ((uint64_t)ptr[0] << 48);
break; break;
case 8: case 8:
val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) | val = (uint64_t)ptr[7] | ((uint64_t)ptr[6] << 8) |
((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) | ((uint64_t)ptr[5] << 16) | ((uint64_t)ptr[4] << 24) |
((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) | ((uint64_t)ptr[3] << 32) | ((uint64_t)ptr[2] << 40) |
((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56); ((uint64_t)ptr[1] << 48) | ((uint64_t)ptr[0] << 56);
break; break;
default:
ss_dassert(false);
break;
} }
return val; return val;

View File

@ -20,6 +20,7 @@
#include <pthread.h> #include <pthread.h>
#include <semaphore.h> #include <semaphore.h>
#include <maxscale/log_manager.h> #include <maxscale/log_manager.h>
#include <maxscale/random_jkiss.h>
using std::cerr; using std::cerr;
using std::cout; using std::cout;
@ -152,7 +153,7 @@ int main(int argc, char* argv[])
int rc; int rc;
std::ios::sync_with_stdio(); std::ios::sync_with_stdio();
random_jkiss_init();
rc = sem_init(&u_semstart, 0, 0); rc = sem_init(&u_semstart, 0, 0);
ensure(rc == 0); ensure(rc == 0);

View File

@ -66,6 +66,7 @@ test1()
int input_counter = 0; int input_counter = 0;
int output_counter = 0; int output_counter = 0;
random_jkiss_init();
hkheartbeat = 0; hkheartbeat = 0;
queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE); queue = mxs_queue_alloc(TEST_QUEUE_SIZE, HEARTBEATS_TO_EXPIRE);

View File

@ -27,7 +27,7 @@
%pure-parser %pure-parser
/** Prefix all functions */ /** Prefix all functions */
%name-prefix="dbfw_yy" %name-prefix "dbfw_yy"
/** The pure parser requires one extra parameter */ /** The pure parser requires one extra parameter */
%parse-param {void* scanner} %parse-param {void* scanner}

View File

@ -74,10 +74,10 @@ typedef struct
char* script; /*< Script to call when state changes occur on servers */ char* script; /*< Script to call when state changes occur on servers */
uint64_t events; /*< enabled events */ uint64_t events; /*< enabled events */
HASHTABLE *server_info; /**< Contains server specific information */ HASHTABLE *server_info; /**< Contains server specific information */
bool failover; /**< If simple failover is enabled */ bool detect_standalone_master; /**< If standalone master are detected */
int failcount; /**< How many monitoring cycles servers must be int failcount; /**< How many monitoring cycles servers must be
down before failover is initiated */ down before failover is initiated */
bool failover_recovery; /**< Allow servers to rejoin the cluster in failover mode */ bool allow_cluster_recovery; /**< Allow failed servers to rejoin the cluster */
bool warn_failover; /**< Log a warning when failover happens */ bool warn_failover; /**< Log a warning when failover happens */
} MYSQL_MONITOR; } MYSQL_MONITOR;

View File

@ -125,9 +125,9 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"detect_stale_slave", MXS_MODULE_PARAM_BOOL, "true"}, {"detect_stale_slave", MXS_MODULE_PARAM_BOOL, "true"},
{"mysql51_replication", MXS_MODULE_PARAM_BOOL, "false"}, {"mysql51_replication", MXS_MODULE_PARAM_BOOL, "false"},
{"multimaster", MXS_MODULE_PARAM_BOOL, "false"}, {"multimaster", MXS_MODULE_PARAM_BOOL, "false"},
{"failover", MXS_MODULE_PARAM_BOOL, "false"}, {"detect_standalone_master", MXS_MODULE_PARAM_BOOL, "false"},
{"failcount", MXS_MODULE_PARAM_COUNT, "5"}, {"failcount", MXS_MODULE_PARAM_COUNT, "5"},
{"failover_recovery", MXS_MODULE_PARAM_BOOL, "true"}, {"allow_cluster_recovery", MXS_MODULE_PARAM_BOOL, "true"},
{ {
"script", "script",
MXS_MODULE_PARAM_PATH, MXS_MODULE_PARAM_PATH,
@ -279,9 +279,9 @@ startMonitor(MXS_MONITOR *monitor, const MXS_CONFIG_PARAMETER* params)
handle->detectStaleSlave = config_get_bool(params, "detect_stale_slave"); handle->detectStaleSlave = config_get_bool(params, "detect_stale_slave");
handle->replicationHeartbeat = config_get_bool(params, "detect_replication_lag"); handle->replicationHeartbeat = config_get_bool(params, "detect_replication_lag");
handle->multimaster = config_get_bool(params, "multimaster"); handle->multimaster = config_get_bool(params, "multimaster");
handle->failover = config_get_bool(params, "failover"); handle->detect_standalone_master = config_get_bool(params, "detect_standalone_master");
handle->failcount = config_get_integer(params, "failcount"); handle->failcount = config_get_integer(params, "failcount");
handle->failover_recovery = config_get_bool(params, "failover_recovery"); handle->allow_cluster_recovery = config_get_bool(params, "allow_cluster_recovery");
handle->mysql51_replication = config_get_bool(params, "mysql51_replication"); handle->mysql51_replication = config_get_bool(params, "mysql51_replication");
handle->script = config_copy_string(params, "script"); handle->script = config_copy_string(params, "script");
handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values); handle->events = config_get_enum(params, "events", mxs_monitor_event_enum_values);
@ -1010,7 +1010,7 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
{ {
MXS_WARNING("Failover initiated, server '%s' is now the master.%s", MXS_WARNING("Failover initiated, server '%s' is now the master.%s",
db->server->unique_name, db->server->unique_name,
handle->failover_recovery ? handle->allow_cluster_recovery ?
"" : " All other servers are set into maintenance mode."); "" : " All other servers are set into maintenance mode.");
handle->warn_failover = false; handle->warn_failover = false;
} }
@ -1019,7 +1019,7 @@ void do_failover(MYSQL_MONITOR *handle, MXS_MONITOR_SERVERS *db)
monitor_set_pending_status(db, SERVER_MASTER); monitor_set_pending_status(db, SERVER_MASTER);
monitor_clear_pending_status(db, SERVER_SLAVE); monitor_clear_pending_status(db, SERVER_SLAVE);
} }
else if (!handle->failover_recovery) else if (!handle->allow_cluster_recovery)
{ {
server_set_status_nolock(db->server, SERVER_MAINT); server_set_status_nolock(db->server, SERVER_MAINT);
monitor_set_pending_status(db, SERVER_MAINT); monitor_set_pending_status(db, SERVER_MAINT);
@ -1298,7 +1298,7 @@ monitorMain(void *arg)
/** Now that all servers have their status correctly set, we can check /** Now that all servers have their status correctly set, we can check
if we need to do a failover */ if we need to do a failover */
if (handle->failover) if (handle->detect_standalone_master)
{ {
if (failover_required(handle, mon->databases)) if (failover_required(handle, mon->databases))
{ {

View File

@ -193,6 +193,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
{"start_index", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"block_size", MXS_MODULE_PARAM_COUNT, "0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{MXS_END_MODULE_PARAMS} {MXS_END_MODULE_PARAMS}
} }
@ -416,6 +417,7 @@ createInstance(SERVICE *service, char **options)
inst->trx_target = config_get_integer(params, "group_trx"); inst->trx_target = config_get_integer(params, "group_trx");
inst->codec = config_get_enum(params, "codec", codec_values); inst->codec = config_get_enum(params, "codec", codec_values);
int first_file = config_get_integer(params, "start_index"); int first_file = config_get_integer(params, "start_index");
inst->block_size = config_get_integer(params, "block_size");
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
bool err = false; bool err = false;
@ -490,6 +492,10 @@ createInstance(SERVICE *service, char **options)
{ {
first_file = MXS_MAX(1, atoi(value)); first_file = MXS_MAX(1, atoi(value));
} }
else if (strcmp(options[i], "block_size") == 0)
{
inst->block_size = atoi(value);
}
else else
{ {
MXS_WARNING("Unknown router option: '%s'", options[i]); MXS_WARNING("Unknown router option: '%s'", options[i]);
@ -1065,14 +1071,20 @@ void converter_func(void* data)
while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK) while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK)
{ {
uint64_t start_pos = router->current_pos; uint64_t start_pos = router->current_pos;
char binlog_name[BINLOG_FNAMELEN + 1];
strcpy(binlog_name, router->binlog_name);
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
{ {
binlog_end = avro_read_all_events(router); binlog_end = avro_read_all_events(router);
if (router->current_pos != start_pos) if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0)
{ {
/** We processed some data, reset the conversion task delay */ /** We processed some data, reset the conversion task delay */
router->task_delay = 1; router->task_delay = 1;
/** Update the GTID index */
avro_update_index(router);
} }
avro_close_binlog(router->binlog_fd); avro_close_binlog(router->binlog_fd);

View File

@ -106,7 +106,7 @@ void avro_close_binlog(int fd)
* @param filepath Path to the created file * @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format * @param json_schema The schema of the table in JSON format
*/ */
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec) AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec, size_t block_size)
{ {
AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE)); AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE));
if (table) if (table)
@ -128,7 +128,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, cons
else else
{ {
rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, rc = avro_file_writer_create_with_codec(filepath, table->avro_schema,
&table->avro_file, codec, 0); &table->avro_file, codec, block_size);
} }
if (rc) if (rc)
@ -884,12 +884,6 @@ void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
} }
hashtable_iterator_free(iter); hashtable_iterator_free(iter);
} }
/** Update the GTID index */
if (flush == AVROROUTER_FLUSH)
{
avro_update_index(router);
}
} }
/** /**

View File

@ -122,7 +122,8 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
/** Close the file and open a new one */ /** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident); hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
codec_to_string(router->codec)); codec_to_string(router->codec),
router->block_size);
if (avro_table) if (avro_table)
{ {
@ -306,14 +307,19 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
* beforehand so we must continue processing them until we reach the end * beforehand so we must continue processing them until we reach the end
* of the event. */ * of the event. */
int rows = 0; int rows = 0;
while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN) while (ptr - start < hdr->event_size - BINLOG_EVENT_HDR_LEN)
{ {
/** Add the current GTID and timestamp */ /** Add the current GTID and timestamp */
uint8_t *end = ptr + hdr->event_size; uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
int event_type = get_event_type(hdr->event_type); int event_type = get_event_type(hdr->event_type);
prepare_record(router, hdr, event_type, &record); prepare_record(router, hdr, event_type, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end); ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record); if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
/** Update rows events have the before and after images of the /** Update rows events have the before and after images of the
* affected rows so we'll process them as another record with * affected rows so we'll process them as another record with
@ -322,7 +328,11 @@ bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
{ {
prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record); prepare_record(router, hdr, UPDATE_EVENT_AFTER, &record);
ptr = process_row_event_data(map, create, &record, ptr, col_present, end); ptr = process_row_event_data(map, create, &record, ptr, col_present, end);
avro_file_writer_append_value(table->avro_file, &record); if (avro_file_writer_append_value(table->avro_file, &record))
{
MXS_ERROR("Failed to write value at position %ld: %s",
router->current_pos, avro_strerror());
}
} }
rows++; rows++;
@ -518,14 +528,23 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
for (long i = 0; i < map->columns && npresent < ncolumns; i++) for (long i = 0; i < map->columns && npresent < ncolumns; i++)
{ {
ss_dassert(create->columns == map->columns); ss_dassert(create->columns == map->columns);
avro_value_get_by_name(record, create->column_names[i], &field, NULL); ss_debug(int rc = )avro_value_get_by_name(record, create->column_names[i], &field, NULL);
ss_dassert(rc == 0);
if (bit_is_set(columns_present, ncolumns, i)) if (bit_is_set(columns_present, ncolumns, i))
{ {
npresent++; npresent++;
if (bit_is_set(null_bitmap, ncolumns, i)) if (bit_is_set(null_bitmap, ncolumns, i))
{ {
avro_value_set_null(&field); if (column_is_blob(map->column_types[i]))
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
else
{
avro_value_set_null(&field);
}
} }
else if (column_is_fixed_string(map->column_types[i])) else if (column_is_fixed_string(map->column_types[i]))
{ {
@ -614,8 +633,16 @@ uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create, avro_value
uint64_t len = 0; uint64_t len = 0;
memcpy(&len, ptr, bytes); memcpy(&len, ptr, bytes);
ptr += bytes; ptr += bytes;
avro_value_set_bytes(&field, ptr, len); if (len)
ptr += len; {
avro_value_set_bytes(&field, ptr, len);
ptr += len;
}
else
{
uint8_t nullvalue = 0;
avro_value_set_bytes(&field, &nullvalue, 1);
}
ss_dassert(ptr < end); ss_dassert(ptr < end);
} }
else if (column_is_temporal(map->column_types[i])) else if (column_is_temporal(map->column_types[i]))

View File

@ -281,6 +281,7 @@ typedef struct avro_instance
uint64_t row_count; /*< Row events processed */ uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */ * a flush of all tables */
uint64_t block_size; /**< Avro datablock size */
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
struct avro_instance *next; struct avro_instance *next;
} AVRO_INSTANCE; } AVRO_INSTANCE;
@ -299,7 +300,8 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd); extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec); extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
const char *codec, size_t block_size);
extern void avro_table_free(AVRO_TABLE *table); extern void avro_table_free(AVRO_TABLE *table);
extern char* json_new_schema_from_table(TABLE_MAP *map); extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);