diff --git a/Documentation/Routers/Avrorouter.md b/Documentation/Routers/Avrorouter.md index f41dc7216..14f824542 100644 --- a/Documentation/Routers/Avrorouter.md +++ b/Documentation/Routers/Avrorouter.md @@ -57,6 +57,15 @@ router=avrorouter source=replication-router ``` +### `codec` + +The compression codec to use. By default, the avrorouter does not use compression. + +This parameter takes one of the following two values; _null_ or +_deflate_. These are the mandatory compression algorithms required by the +Avro specification. For more information about the compression types, +refer to the [Avro specification](https://avro.apache.org/docs/current/spec.html#Required+Codecs). + **Note:** Since the 2.1 version of MaxScale, all of the router options can also be defined as parameters. diff --git a/server/modules/routing/avrorouter/avro.c b/server/modules/routing/avrorouter/avro.c index 49eda9f41..5fa9fd04b 100644 --- a/server/modules/routing/avrorouter/avro.c +++ b/server/modules/routing/avrorouter/avro.c @@ -120,6 +120,15 @@ bool avro_handle_convert(const MODULECMD_ARG *args) return rval; } +static const MXS_ENUM_VALUE codec_values[] = +{ + {"null", MXS_AVRO_CODEC_NULL}, + {"deflate", MXS_AVRO_CODEC_DEFLATE}, +// Not yet implemented +// {"snappy", MXS_AVRO_CODEC_SNAPPY}, + {NULL} +}; + /** * The module entry point routine. It is this routine that * must populate the structure that is referred to as the @@ -184,6 +193,7 @@ MXS_MODULE* MXS_CREATE_MODULE() {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"}, + {"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values}, {MXS_END_MODULE_PARAMS} } }; @@ -404,6 +414,7 @@ createInstance(SERVICE *service, char **options) inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem")); inst->row_target = config_get_integer(params, "group_rows"); inst->trx_target = config_get_integer(params, "group_trx"); + inst->codec = config_get_enum(params, "codec", codec_values); int first_file = config_get_integer(params, "start_index"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); diff --git a/server/modules/routing/avrorouter/avro_file.c b/server/modules/routing/avrorouter/avro_file.c index 12dfd7836..5ac896a12 100644 --- a/server/modules/routing/avrorouter/avro_file.c +++ b/server/modules/routing/avrorouter/avro_file.c @@ -106,7 +106,7 @@ void avro_close_binlog(int fd) * @param filepath Path to the created file * @param json_schema The schema of the table in JSON format */ -AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) +AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec) { AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE)); if (table) @@ -127,7 +127,8 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) } else { - rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file); + rc = avro_file_writer_create_with_codec(filepath, table->avro_schema, + &table->avro_file, codec, 0); } if (rc) diff --git a/server/modules/routing/avrorouter/avro_rbr.c b/server/modules/routing/avrorouter/avro_rbr.c index ca6534a3b..7a1aed890 100644 --- a/server/modules/routing/avrorouter/avro_rbr.c +++ b/server/modules/routing/avrorouter/avro_rbr.c @@ -65,6 +65,22 @@ static int get_event_type(uint8_t event) } } +static const char* codec_to_string(enum mxs_avro_codec_type type) +{ + switch (type) + { + case MXS_AVRO_CODEC_NULL: + return "null"; + case MXS_AVRO_CODEC_DEFLATE: + return "deflate"; + case MXS_AVRO_CODEC_SNAPPY: + return "snappy"; + default: + ss_dassert(false); + return "null"; + } +} + /** * @brief Handle a table map event * @@ -105,7 +121,8 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr /** Close the file and open a new one */ hashtable_delete(router->open_tables, table_ident); - AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema); + AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema, + codec_to_string(router->codec)); if (avro_table) { diff --git a/server/modules/routing/avrorouter/avrorouter.h b/server/modules/routing/avrorouter/avrorouter.h index 6ee02a74e..b89dc2826 100644 --- a/server/modules/routing/avrorouter/avrorouter.h +++ b/server/modules/routing/avrorouter/avrorouter.h @@ -186,6 +186,13 @@ enum avro_data_format AVRO_FORMAT_AVRO, }; +enum mxs_avro_codec_type +{ + MXS_AVRO_CODEC_NULL, + MXS_AVRO_CODEC_DEFLATE, + MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */ +} ; + typedef struct gtid_pos { uint32_t timestamp; /*< GTID event timestamp */ @@ -274,6 +281,7 @@ typedef struct avro_instance uint64_t row_count; /*< Row events processed */ uint64_t row_target; /*< Minimum about of row events that will trigger * a flush of all tables */ + enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ struct avro_instance *next; } AVRO_INSTANCE; @@ -291,7 +299,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8 extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern void avro_close_binlog(int fd); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); -extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); +extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec); extern void avro_table_free(AVRO_TABLE *table); extern char* json_new_schema_from_table(TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);