Add deflate compression to avrorouter

The avrorouter can now compress the Avro files using the deflate algorithm
provided by zlib.
This commit is contained in:
Markus Mäkelä
2017-02-21 16:14:59 +02:00
parent c47ef968f7
commit c3cc46ae04
5 changed files with 50 additions and 4 deletions

View File

@ -57,6 +57,15 @@ router=avrorouter
source=replication-router source=replication-router
``` ```
### `codec`
The compression codec to use. By default, the avrorouter does not use compression.
This parameter takes one of the following two values; _null_ or
_deflate_. These are the mandatory compression algorithms required by the
Avro specification. For more information about the compression types,
refer to the [Avro specification](https://avro.apache.org/docs/current/spec.html#Required+Codecs).
**Note:** Since the 2.1 version of MaxScale, all of the router options can also **Note:** Since the 2.1 version of MaxScale, all of the router options can also
be defined as parameters. be defined as parameters.

View File

@ -120,6 +120,15 @@ bool avro_handle_convert(const MODULECMD_ARG *args)
return rval; return rval;
} }
static const MXS_ENUM_VALUE codec_values[] =
{
{"null", MXS_AVRO_CODEC_NULL},
{"deflate", MXS_AVRO_CODEC_DEFLATE},
// Not yet implemented
// {"snappy", MXS_AVRO_CODEC_SNAPPY},
{NULL}
};
/** /**
* The module entry point routine. It is this routine that * The module entry point routine. It is this routine that
* must populate the structure that is referred to as the * must populate the structure that is referred to as the
@ -184,6 +193,7 @@ MXS_MODULE* MXS_CREATE_MODULE()
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"}, {"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"}, {"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
{"start_index", MXS_MODULE_PARAM_COUNT, "1"}, {"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{MXS_END_MODULE_PARAMS} {MXS_END_MODULE_PARAMS}
} }
}; };
@ -404,6 +414,7 @@ createInstance(SERVICE *service, char **options)
inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem")); inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem"));
inst->row_target = config_get_integer(params, "group_rows"); inst->row_target = config_get_integer(params, "group_rows");
inst->trx_target = config_get_integer(params, "group_trx"); inst->trx_target = config_get_integer(params, "group_trx");
inst->codec = config_get_enum(params, "codec", codec_values);
int first_file = config_get_integer(params, "start_index"); int first_file = config_get_integer(params, "start_index");
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");

View File

@ -106,7 +106,7 @@ void avro_close_binlog(int fd)
* @param filepath Path to the created file * @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format * @param json_schema The schema of the table in JSON format
*/ */
AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema) AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec)
{ {
AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE)); AVRO_TABLE *table = MXS_CALLOC(1, sizeof(AVRO_TABLE));
if (table) if (table)
@ -127,7 +127,8 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema)
} }
else else
{ {
rc = avro_file_writer_create(filepath, table->avro_schema, &table->avro_file); rc = avro_file_writer_create_with_codec(filepath, table->avro_schema,
&table->avro_file, codec, 0);
} }
if (rc) if (rc)

View File

@ -65,6 +65,22 @@ static int get_event_type(uint8_t event)
} }
} }
static const char* codec_to_string(enum mxs_avro_codec_type type)
{
switch (type)
{
case MXS_AVRO_CODEC_NULL:
return "null";
case MXS_AVRO_CODEC_DEFLATE:
return "deflate";
case MXS_AVRO_CODEC_SNAPPY:
return "snappy";
default:
ss_dassert(false);
return "null";
}
}
/** /**
* @brief Handle a table map event * @brief Handle a table map event
* *
@ -105,7 +121,8 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
/** Close the file and open a new one */ /** Close the file and open a new one */
hashtable_delete(router->open_tables, table_ident); hashtable_delete(router->open_tables, table_ident);
AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema); AVRO_TABLE *avro_table = avro_table_alloc(filepath, json_schema,
codec_to_string(router->codec));
if (avro_table) if (avro_table)
{ {

View File

@ -186,6 +186,13 @@ enum avro_data_format
AVRO_FORMAT_AVRO, AVRO_FORMAT_AVRO,
}; };
enum mxs_avro_codec_type
{
MXS_AVRO_CODEC_NULL,
MXS_AVRO_CODEC_DEFLATE,
MXS_AVRO_CODEC_SNAPPY, /**< Not yet implemented */
} ;
typedef struct gtid_pos typedef struct gtid_pos
{ {
uint32_t timestamp; /*< GTID event timestamp */ uint32_t timestamp; /*< GTID event timestamp */
@ -274,6 +281,7 @@ typedef struct avro_instance
uint64_t row_count; /*< Row events processed */ uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Minimum about of row events that will trigger uint64_t row_target; /*< Minimum about of row events that will trigger
* a flush of all tables */ * a flush of all tables */
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
struct avro_instance *next; struct avro_instance *next;
} AVRO_INSTANCE; } AVRO_INSTANCE;
@ -291,7 +299,7 @@ extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd); extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd); extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router); extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema); extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec);
extern void avro_table_free(AVRO_TABLE *table); extern void avro_table_free(AVRO_TABLE *table);
extern char* json_new_schema_from_table(TABLE_MAP *map); extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map); extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);