Update avrorouter and the related tutorial
The avrorouter now uses the parameters from the source service. This removes the need for redundant parameter definition in the avrorouter service when they are defined in the binlogrouter service as parameters. Added some missing configuration sanity checks and updated the tutorial to reflect the new configuration method introduced in 2.1.
This commit is contained in:
parent
45b78a795e
commit
25f9cc14b4
@ -5,8 +5,10 @@ This tutorial is a short introduction to the
|
||||
with the binlogrouter.
|
||||
|
||||
The avrorouter can also be deployed directly on the master server which removes
|
||||
the need to use the binlogrouter. This does require a lot more disk space on
|
||||
the master server as both the binlogs and the Avro format files are stored there.
|
||||
the need to use the binlogrouter. This does require a lot more disk space on the
|
||||
master server as both the binlogs and the Avro format files are stored there. It
|
||||
is recommended to deploy the avrorouter and the binlogrouter on a remove server
|
||||
so that the data streaming process has a minimal effect on performance.
|
||||
|
||||
The first part configures the services and sets them up for the binary log to Avro
|
||||
file conversion. The second part of this tutorial uses the client listener
|
||||
@ -44,10 +46,9 @@ from the master and convert them into Avro format files.
|
||||
[replication-service]
|
||||
type=service
|
||||
router=binlogrouter
|
||||
router_options=server-id=4000,
|
||||
master-id=3000,
|
||||
binlogdir=/var/lib/maxscale/binlog/,
|
||||
mariadb10-compatibility=1,
|
||||
server_id=4000
|
||||
master_id=3000
|
||||
filestem=binlog
|
||||
user=maxuser
|
||||
passwd=maxpwd
|
||||
|
||||
@ -56,15 +57,13 @@ passwd=maxpwd
|
||||
type=service
|
||||
router=avrorouter
|
||||
source=replication-service
|
||||
router_options=avrodir=/var/lib/maxscale/avro/,
|
||||
filestem=binlog
|
||||
|
||||
# The listener for the replication-service
|
||||
[replication-listener]
|
||||
type=listener
|
||||
service=replication-router
|
||||
service=replication-service
|
||||
protocol=MySQLClient
|
||||
port=4000
|
||||
port=3306
|
||||
|
||||
# The client listener for the avro-service
|
||||
[avro-listener]
|
||||
@ -72,19 +71,29 @@ type=listener
|
||||
service=avro-service
|
||||
protocol=CDC
|
||||
port=4001
|
||||
|
||||
# The MaxAdmin service and listener for MaxScale administration
|
||||
[CLI]
|
||||
type=service
|
||||
router=cli
|
||||
|
||||
[CLI Listener]
|
||||
type=listener
|
||||
service=CLI
|
||||
protocol=maxscaled
|
||||
socket=default
|
||||
```
|
||||
|
||||
You can see that the `source` parameter in the _avro-service_ points to the
|
||||
_replication-service_ we defined before. This service will be the data source
|
||||
for the avrorouter. The _filestem_ is the prefix in the binlog files and the
|
||||
additional _avrodir_ router_option is where the converted Avro files are stored.
|
||||
For more information on the avrorouter options, read the
|
||||
for the avrorouter. The _filestem_ is the prefix in the binlog files. For more
|
||||
information on the avrorouter options, read the
|
||||
[Avrorouter Documentation](../Routers/Avrorouter.md).
|
||||
|
||||
After the services were defined, we added the listeners for the
|
||||
_replication-service_ and the _avro-service_. The _CDC_ protocol is a new
|
||||
protocol added with the avrorouter and, at the time of writing, it is the only
|
||||
supported protocol for the avrorouter.
|
||||
protocol added with the avrorouter and it is the only supported protocol for the
|
||||
avrorouter.
|
||||
|
||||
# Preparing the data in the master server
|
||||
|
||||
@ -99,6 +108,7 @@ created. There are two ways to do this:
|
||||
|
||||
- Manually create the schema
|
||||
- Use the [_cdc_schema_ Go utilty](../Routers/Avrorouter.md#avro-schema-generator)
|
||||
- Use the [Python version of the schema generator](../../server/modules/protocol/examples/cdc_schema.py)
|
||||
|
||||
All Avro file schemas follow the same general idea. They are in JSON and follow
|
||||
the following format:
|
||||
@ -112,27 +122,35 @@ the following format:
|
||||
[
|
||||
{
|
||||
"name": "name",
|
||||
"type": "string"
|
||||
"type": "string",
|
||||
"real_type": "varchar",
|
||||
"length": 200
|
||||
},
|
||||
{
|
||||
"name":"address",
|
||||
"type":"string"
|
||||
"type":"string",
|
||||
"real_type": "varchar",
|
||||
"length": 200
|
||||
},
|
||||
{
|
||||
"name":"age",
|
||||
"type":"int"
|
||||
"type":"int",
|
||||
"real_type": "int",
|
||||
"length": -1
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
The avrorouter uses the schema file to identify the columns, their names and
|
||||
what type they are. The _name_ field contains the name of the column and the _type_
|
||||
contains the Avro type. Read the [Avro specification](https://avro.apache.org/docs/1.8.1/spec.html)
|
||||
what type they are. The _name_ field contains the name of the column and the
|
||||
_type_ contains the Avro type. Read the [Avro specification](https://avro.apache.org/docs/1.8.1/spec.html)
|
||||
for details on the layout of the schema files.
|
||||
|
||||
All Avro schema files for tables that are not created in the binary logs need to
|
||||
be in the location pointed by the _avrodir_ router_option and must use the following naming: `<database>.<table>.<schema_version>.avsc`. For example, the schema file name of the _test.t1_ table would be `test.t1.0000001.avsc`.
|
||||
be in the location pointed by the _avrodir_ router_option and must use the
|
||||
following naming: `<database>.<table>.<schema_version>.avsc`. For example, the
|
||||
schema file name of the _test.t1_ table would be `test.t1.0000001.avsc`.
|
||||
|
||||
# Starting MariaDB MaxScale
|
||||
|
||||
@ -174,3 +192,17 @@ Avro file, which you can inspect by using the _maxavrocheck_ utility program.
|
||||
File sync marker: caaed7778bbe58e701eec1f96d7719a
|
||||
/home/markusjm/build/avrodata/test.t1.000001.avro: 1 blocks, 1 records and 12 bytes
|
||||
```
|
||||
|
||||
To use the _cdc.py_ command line client to connect to the CDC service, we must first
|
||||
create a user. This can be done via maxadmin by executing the following command.
|
||||
|
||||
```
|
||||
maxadmin call command cdc add_user avro-service maxuser maxpwd
|
||||
```
|
||||
|
||||
This will create the _maxuser:maxpwd_ credentials which can then be used to
|
||||
request a data stream of the `test.t1` table that was created earlier.
|
||||
|
||||
```
|
||||
cdc.py -u maxuser -p maxpwd -h 127.0.0.1 -P 4001 test.t1
|
||||
```
|
||||
|
@ -311,8 +311,23 @@ static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
|
||||
* @param inst Avro router instance
|
||||
* @param options The @c router_options of a binlogrouter instance
|
||||
*/
|
||||
void read_source_service_options(AVRO_INSTANCE *inst, const char** options)
|
||||
void read_source_service_options(AVRO_INSTANCE *inst, const char** options,
|
||||
MXS_CONFIG_PARAMETER* params)
|
||||
{
|
||||
for (MXS_CONFIG_PARAMETER* p = params; p; p = p->next)
|
||||
{
|
||||
if (strcmp(p->name, "binlogdir") == 0)
|
||||
{
|
||||
MXS_FREE(inst->binlogdir);
|
||||
inst->binlogdir = MXS_STRDUP_A(p->value);
|
||||
}
|
||||
else if (strcmp(p->name, "filestem") == 0)
|
||||
{
|
||||
MXS_FREE(inst->fileroot);
|
||||
inst->fileroot = MXS_STRDUP_A(p->value);
|
||||
}
|
||||
}
|
||||
|
||||
if (options)
|
||||
{
|
||||
for (int i = 0; options[i]; i++)
|
||||
@ -328,11 +343,12 @@ void read_source_service_options(AVRO_INSTANCE *inst, const char** options)
|
||||
|
||||
if (strcmp(option, "binlogdir") == 0)
|
||||
{
|
||||
MXS_FREE(inst->binlogdir);
|
||||
inst->binlogdir = MXS_STRDUP_A(value);
|
||||
MXS_INFO("Reading MySQL binlog files from %s", inst->binlogdir);
|
||||
}
|
||||
else if (strcmp(option, "filestem") == 0)
|
||||
{
|
||||
MXS_FREE(inst->fileroot);
|
||||
inst->fileroot = MXS_STRDUP_A(value);
|
||||
}
|
||||
}
|
||||
@ -437,7 +453,8 @@ createInstance(SERVICE *service, char **options)
|
||||
{
|
||||
MXS_NOTICE("[%s] Using configuration options from service '%s'.",
|
||||
service->name, source->name);
|
||||
read_source_service_options(inst, (const char**)source->routerOptions);
|
||||
read_source_service_options(inst, (const char**)source->routerOptions,
|
||||
source->svc_config_param);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -453,6 +470,7 @@ createInstance(SERVICE *service, char **options)
|
||||
|
||||
if (param)
|
||||
{
|
||||
MXS_FREE(inst->binlogdir);
|
||||
inst->binlogdir = MXS_STRDUP_A(param->value);
|
||||
}
|
||||
|
||||
@ -519,6 +537,11 @@ createInstance(SERVICE *service, char **options)
|
||||
MXS_ERROR("No 'binlogdir' option found in source service, in parameters or in router_options.");
|
||||
err = true;
|
||||
}
|
||||
else if (inst->fileroot == NULL)
|
||||
{
|
||||
MXS_ERROR("No 'filestem' option found in source service, in parameters or in router_options.");
|
||||
err = true;
|
||||
}
|
||||
else if (ensure_dir_ok(inst->binlogdir, R_OK) && ensure_dir_ok(inst->avrodir, W_OK))
|
||||
{
|
||||
snprintf(inst->binlog_name, sizeof(inst->binlog_name), BINLOG_NAMEFMT, inst->fileroot, first_file);
|
||||
|
Loading…
x
Reference in New Issue
Block a user