Uncrustify maxscale

See script directory for method. The script to run in the top level
MaxScale directory is called maxscale-uncrustify.sh, which uses
another script, list-src, from the same directory (so you need to set
your PATH). The uncrustify version was 0.66.
This commit is contained in:
Niclas Antti
2018-09-09 22:26:19 +03:00
parent fa7ec95069
commit c447e5cf15
849 changed files with 35002 additions and 27238 deletions

View File

@ -77,7 +77,7 @@ void Avro::read_source_service_options(SERVICE* source)
}
}
for (const auto& opt: mxs::strtok(config_get_string(params, "router_options"), ", \t"))
for (const auto& opt : mxs::strtok(config_get_string(params, "router_options"), ", \t"))
{
auto kv = mxs::strtok(opt, "=");
@ -92,15 +92,15 @@ void Avro::read_source_service_options(SERVICE* source)
}
}
//static
// static
Avro* Avro::create(SERVICE* service, SRowEventHandler handler)
{
SERVICE* source_service = NULL;
MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source");
MXS_CONFIG_PARAMETER* param = config_get_param(service->svc_config_param, "source");
if (param)
{
SERVICE *source = service_find(param->value);
SERVICE* source = service_find(param->value);
mxb_assert(source);
if (source)
@ -113,7 +113,9 @@ Avro* Avro::create(SERVICE* service, SRowEventHandler handler)
else
{
MXS_ERROR("Service '%s' uses router module '%s' instead of "
"'binlogrouter'.", source->name, source->routerModule);
"'binlogrouter'.",
source->name,
source->routerModule);
return NULL;
}
}
@ -124,23 +126,23 @@ Avro* Avro::create(SERVICE* service, SRowEventHandler handler)
}
}
return new (std::nothrow) Avro(service, service->svc_config_param, source_service, handler);
return new( std::nothrow) Avro(service, service->svc_config_param, source_service, handler);
}
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler):
service(service),
filestem(config_get_string(params, "filestem")),
binlogdir(config_get_string(params, "binlogdir")),
avrodir(config_get_string(params, "avrodir")),
current_pos(4),
binlog_fd(-1),
trx_count(0),
trx_target(config_get_integer(params, "group_trx")),
row_count(0),
row_target(config_get_integer(params, "group_rows")),
task_handle(0),
handler(service, handler, config_get_compiled_regex(params, "match", 0, NULL),
config_get_compiled_regex(params, "exclude", 0, NULL))
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler)
: service(service)
, filestem(config_get_string(params, "filestem"))
, binlogdir(config_get_string(params, "binlogdir"))
, avrodir(config_get_string(params, "avrodir"))
, current_pos(4)
, binlog_fd(-1)
, trx_count(0)
, trx_target(config_get_integer(params, "group_trx"))
, row_count(0)
, row_target(config_get_integer(params, "group_rows"))
, task_handle(0)
, handler(service, handler, config_get_compiled_regex(params, "match", 0, NULL),
config_get_compiled_regex(params, "exclude", 0, NULL))
{
if (source)
{
@ -148,7 +150,10 @@ Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRow
}
char filename[BINLOG_FNAMELEN + 1];
snprintf(filename, sizeof(filename), BINLOG_NAMEFMT, filestem.c_str(),
snprintf(filename,
sizeof(filename),
BINLOG_NAMEFMT,
filestem.c_str(),
config_get_integer(params, "start_index"));
binlog_name = filename;

View File

@ -47,7 +47,7 @@ enum
AVRO_CLIENT_ERRORED,
};
int AvroSession::routeQuery(GWBUF *queue)
int AvroSession::routeQuery(GWBUF* queue)
{
int rval = 1;
@ -57,6 +57,7 @@ int AvroSession::routeQuery(GWBUF *queue)
/* force disconnection */
return 0;
break;
case AVRO_CLIENT_UNREGISTERED:
if (do_registration(queue) == 0)
{
@ -77,6 +78,7 @@ int AvroSession::routeQuery(GWBUF *queue)
dcb->remote != NULL ? dcb->remote : "");
}
break;
case AVRO_CLIENT_REGISTERED:
case AVRO_CLIENT_REQUEST_DATA:
if (state == AVRO_CLIENT_REGISTERED)
@ -88,6 +90,7 @@ int AvroSession::routeQuery(GWBUF *queue)
process_command(queue);
break;
default:
state = AVRO_CLIENT_ERRORED;
rval = 0;
@ -106,17 +109,17 @@ int AvroSession::routeQuery(GWBUF *queue)
*
* @return 1 on successful registration, 0 on error
*/
int AvroSession::do_registration(GWBUF *data)
int AvroSession::do_registration(GWBUF* data)
{
const char reg_uuid[] = "REGISTER UUID=";
const int reg_uuid_len = strlen(reg_uuid);
int data_len = GWBUF_LENGTH(data) - reg_uuid_len;
char *request = (char*)GWBUF_DATA(data);
char* request = (char*)GWBUF_DATA(data);
int ret = 0;
if (strstr(request, reg_uuid) != NULL)
{
char *sep_ptr;
char* sep_ptr;
int uuid_len = (data_len > CDC_UUID_LEN) ? CDC_UUID_LEN : data_len;
/* 36 +1 */
char client_uuid[uuid_len + 1];
@ -148,7 +151,7 @@ int AvroSession::do_registration(GWBUF *data)
if (data_len > 0)
{
/* Check for CDC request type */
char *tmp_ptr = strstr(request + sizeof(reg_uuid) + uuid_len, "TYPE=");
char* tmp_ptr = strstr(request + sizeof(reg_uuid) + uuid_len, "TYPE=");
if (tmp_ptr)
{
if (memcmp(tmp_ptr + 5, "AVRO", 4) == 0)
@ -187,9 +190,9 @@ int AvroSession::do_registration(GWBUF *data)
* @param start
* @param end
*/
void extract_gtid_request(gtid_pos_t *gtid, const char *start, int len)
void extract_gtid_request(gtid_pos_t* gtid, const char* start, int len)
{
const char *ptr = start;
const char* ptr = start;
int read = 0;
while (ptr < start + len)
@ -200,15 +203,17 @@ void extract_gtid_request(gtid_pos_t *gtid, const char *start, int len)
}
else
{
char *end;
char* end;
switch (read)
{
case 0:
gtid->domain = strtol(ptr, &end, 10);
break;
case 1:
gtid->server_id = strtol(ptr, &end, 10);
break;
case 2:
gtid->seq = strtol(ptr, &end, 10);
break;
@ -226,7 +231,7 @@ void extract_gtid_request(gtid_pos_t *gtid, const char *start, int len)
* @param file File to search
* @return True if file exists
*/
bool file_in_dir(const char *dir, const char *file)
bool file_in_dir(const char* dir, const char* file)
{
char path[PATH_MAX + 1];
@ -243,11 +248,11 @@ bool file_in_dir(const char *dir, const char *file)
* @param userdata Data provided when the callback was added
* @return Always 0
*/
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
int avro_client_callback(DCB* dcb, DCB_REASON reason, void* userdata)
{
if (reason == DCB_REASON_DRAINED)
{
AvroSession *client = static_cast<AvroSession*>(userdata);
AvroSession* client = static_cast<AvroSession*>(userdata);
client->client_callback();
}
@ -283,9 +288,9 @@ std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file)
auto first_dot = filename.find_first_of('.');
auto last_dot = filename.find_last_of('.');
if (first_dot != std::string::npos &&
last_dot != std::string::npos &&
first_dot != last_dot)
if (first_dot != std::string::npos
&& last_dot != std::string::npos
&& first_dot != last_dot)
{
// Exact file version specified e.g. test.t1.000002
filename += ".avro";
@ -305,7 +310,7 @@ std::pair<std::string, std::string> get_avrofile_and_gtid(std::string file)
* @param data Buffer containing the command
*
*/
void AvroSession::process_command(GWBUF *queue)
void AvroSession::process_command(GWBUF* queue)
{
const char req_data[] = "REQUEST-DATA";
const size_t req_data_len = sizeof(req_data) - 1;
@ -313,11 +318,11 @@ void AvroSession::process_command(GWBUF *queue)
uint8_t data[buflen + 1];
gwbuf_copy_data(queue, 0, buflen, data);
data[buflen] = '\0';
char *command_ptr = strstr((char *)data, req_data);
char* command_ptr = strstr((char*)data, req_data);
if (command_ptr != NULL)
{
char *file_ptr = command_ptr + req_data_len;
char* file_ptr = command_ptr + req_data_len;
int data_len = GWBUF_LENGTH(queue) - req_data_len;
if (data_len > 1)
@ -354,21 +359,21 @@ void AvroSession::process_command(GWBUF *queue)
else
{
const char err[] = "ERR: Unknown command\n";
GWBUF *reply = gwbuf_alloc_and_load(sizeof(err), err);
GWBUF* reply = gwbuf_alloc_and_load(sizeof(err), err);
dcb->func.write(dcb, reply);
}
}
static int send_row(DCB *dcb, json_t* row)
static int send_row(DCB* dcb, json_t* row)
{
char *json = json_dumps(row, JSON_PRESERVE_ORDER);
char* json = json_dumps(row, JSON_PRESERVE_ORDER);
size_t len = strlen(json);
GWBUF *buf = gwbuf_alloc(len + 1);
GWBUF* buf = gwbuf_alloc(len + 1);
int rc = 0;
if (json && buf)
{
uint8_t *data = GWBUF_DATA(buf);
uint8_t* data = GWBUF_DATA(buf);
memcpy(data, json, len);
data[len] = '\n';
rc = dcb->func.write(dcb, buf);
@ -382,9 +387,9 @@ static int send_row(DCB *dcb, json_t* row)
return rc;
}
void AvroSession::set_current_gtid(json_t *row)
void AvroSession::set_current_gtid(json_t* row)
{
json_t *obj = json_object_get(row, avro_sequence);
json_t* obj = json_object_get(row, avro_sequence);
mxb_assert(json_is_integer(obj));
gtid.seq = json_integer_value(obj);
@ -410,7 +415,7 @@ bool AvroSession::stream_json()
do
{
json_t *row;
json_t* row;
int rc = 1;
while (rc > 0 && (row = maxavro_record_read_json(file_handle)))
{
@ -434,7 +439,7 @@ bool AvroSession::stream_json()
*/
bool AvroSession::stream_binary()
{
GWBUF *buffer;
GWBUF* buffer;
uint64_t bytes = 0;
int rc = 1;
@ -473,10 +478,10 @@ bool AvroSession::seek_to_gtid()
do
{
json_t *row;
json_t* row;
while ((row = maxavro_record_read_json(file_handle)))
{
json_t *obj = json_object_get(row, avro_sequence);
json_t* obj = json_object_get(row, avro_sequence);
mxb_assert(json_is_integer(obj));
uint64_t value = json_integer_value(obj);
@ -495,8 +500,12 @@ bool AvroSession::seek_to_gtid()
if (value == gtid.domain)
{
MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s", gtid.domain,
gtid.server_id, gtid.seq, dcb->user, dcb->remote);
MXS_INFO("Found GTID %lu-%lu-%lu for %s@%s",
gtid.domain,
gtid.server_id,
gtid.seq,
dcb->user,
dcb->remote);
seeking = false;
}
}
@ -595,7 +604,9 @@ GWBUF* read_avro_json_schema(std::string avrofile, std::string dir)
}
else
{
MXS_ERROR("Failed to open file '%s': %d, %s", schemafile.c_str(), errno,
MXS_ERROR("Failed to open file '%s': %d, %s",
schemafile.c_str(),
errno,
mxs_strerror(errno));
}
@ -606,7 +617,7 @@ GWBUF* read_avro_binary_schema(std::string avrofile, std::string dir)
{
GWBUF* rval = NULL;
std::string filename = dir + '/' + avrofile;
MAXAVRO_FILE *file = maxavro_file_open(filename.c_str());
MAXAVRO_FILE* file = maxavro_file_open(filename.c_str());
if (file)
{
@ -641,8 +652,10 @@ void AvroSession::rotate_avro_file(std::string fullname)
}
else
{
MXS_INFO("Rotated '%s'@'%s' to file: %s", dcb->user,
dcb->remote, fullname.c_str());
MXS_INFO("Rotated '%s'@'%s' to file: %s",
dcb->user,
dcb->remote,
fullname.c_str());
}
}
@ -668,8 +681,12 @@ static std::string get_next_filename(std::string file, std::string dir)
// Print it out the new filename with the file number incremented by one
char outbuf[PATH_MAX + 1];
snprintf(outbuf, sizeof(outbuf), "%s/%s.%06d.avro",
dir.c_str(), file_part.c_str(), filenum + 1);
snprintf(outbuf,
sizeof(outbuf),
"%s/%s.%06d.avro",
dir.c_str(),
file_part.c_str(),
filenum + 1);
return std::string(outbuf);
}
@ -682,7 +699,7 @@ void AvroSession::client_callback()
last_sent_pos = 1;
/** Send the schema of the current file */
GWBUF *schema = NULL;
GWBUF* schema = NULL;
switch (format)
{
@ -725,19 +742,19 @@ void AvroSession::client_callback()
// static
AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session)
{
return new (std::nothrow) AvroSession(inst, session);
return new( std::nothrow) AvroSession(inst, session);
}
AvroSession::AvroSession(Avro* instance, MXS_SESSION* session):
dcb(session->client_dcb),
state(AVRO_CLIENT_UNREGISTERED),
format(AVRO_FORMAT_UNDEFINED),
catch_lock(SPINLOCK_INIT),
router(instance),
file_handle(NULL),
last_sent_pos(0),
connect_time(time(NULL)),
requested_gtid(false)
AvroSession::AvroSession(Avro* instance, MXS_SESSION* session)
: dcb(session->client_dcb)
, state(AVRO_CLIENT_UNREGISTERED)
, format(AVRO_FORMAT_UNDEFINED)
, catch_lock(SPINLOCK_INIT)
, router(instance)
, file_handle(NULL)
, last_sent_pos(0)
, connect_time(time(NULL))
, requested_gtid(false)
{
}

View File

@ -26,14 +26,17 @@
* @param filepath Path to the created file
* @param json_schema The schema of the table in JSON format
*/
AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const char *codec,
AvroTable* avro_table_alloc(const char* filepath,
const char* json_schema,
const char* codec,
size_t block_size)
{
avro_file_writer_t avro_file;
avro_value_iface_t* avro_writer_iface;
avro_schema_t avro_schema;
if (avro_schema_from_json_length(json_schema, strlen(json_schema),
if (avro_schema_from_json_length(json_schema,
strlen(json_schema),
&avro_schema))
{
MXS_ERROR("Avro error: %s", avro_strerror());
@ -49,8 +52,11 @@ AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const
}
else
{
rc = avro_file_writer_create_with_codec(filepath, avro_schema,
&avro_file, codec, block_size);
rc = avro_file_writer_create_with_codec(filepath,
avro_schema,
&avro_file,
codec,
block_size);
}
if (rc)
@ -68,7 +74,7 @@ AvroTable* avro_table_alloc(const char* filepath, const char* json_schema, const
return NULL;
}
AvroTable* table = new (std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
AvroTable* table = new( std::nothrow) AvroTable(avro_file, avro_writer_iface, avro_schema);
if (!table)
{
@ -138,48 +144,103 @@ char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEv
if (map->version != create->version)
{
MXS_ERROR("Version mismatch for table %s.%s. Table map version is %d and "
"the table definition version is %d.", map->database.c_str(),
map->table.c_str(), map->version, create->version);
mxb_assert(!true); // Should not happen
"the table definition version is %d.",
map->database.c_str(),
map->table.c_str(),
map->version,
create->version);
mxb_assert(!true); // Should not happen
return NULL;
}
json_error_t err;
memset(&err, 0, sizeof(err));
json_t *schema = json_object();
json_t* schema = json_object();
json_object_set_new(schema, "namespace", json_string("MaxScaleChangeDataSchema.avro"));
json_object_set_new(schema, "type", json_string("record"));
json_object_set_new(schema, "name", json_string("ChangeRecord"));
json_t *array = json_array();
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_domain, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_server_id, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_sequence, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_event_number, "type", "int"));
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s}", "name",
avro_timestamp, "type", "int"));
json_t* array = json_array();
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s}",
"name",
avro_domain,
"type",
"int"));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s}",
"name",
avro_server_id,
"type",
"int"));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s}",
"name",
avro_sequence,
"type",
"int"));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s}",
"name",
avro_event_number,
"type",
"int"));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s}",
"name",
avro_timestamp,
"type",
"int"));
/** Enums and other complex types are defined with complete JSON objects
* instead of string values */
json_t *event_types = json_pack_ex(&err, 0, "{s:s, s:s, s:[s,s,s,s]}", "type", "enum",
"name", "EVENT_TYPES", "symbols", "insert",
"update_before", "update_after", "delete");
json_t* event_types = json_pack_ex(&err,
0,
"{s:s, s:s, s:[s,s,s,s]}",
"type",
"enum",
"name",
"EVENT_TYPES",
"symbols",
"insert",
"update_before",
"update_after",
"delete");
// Ownership of `event_types` is stolen when using the `o` format
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:o}", "name", avro_event_type,
"type", event_types));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:o}",
"name",
avro_event_type,
"type",
event_types));
for (uint64_t i = 0; i < map->columns() && i < create->columns.size(); i++)
{
json_array_append_new(array, json_pack_ex(&err, 0, "{s:s, s:s, s:s, s:i}",
"name", create->columns[i].name.c_str(),
"type", column_type_to_avro_type(map->column_types[i]),
"real_type", create->columns[i].type.c_str(),
"length", create->columns[i].length));
json_array_append_new(array,
json_pack_ex(&err,
0,
"{s:s, s:s, s:s, s:i}",
"name",
create->columns[i].name.c_str(),
"type",
column_type_to_avro_type(map->column_types[i]),
"real_type",
create->columns[i].type.c_str(),
"length",
create->columns[i].length));
}
json_object_set_new(schema, "fields", array);
char* rval = json_dumps(schema, JSON_PRESERVE_ORDER);
@ -194,18 +255,25 @@ char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEv
* @param schema Schema in JSON format
* @param map Table map that @p schema represents
*/
void save_avro_schema(const char *path, const char* schema, const STableMapEvent& map,
void save_avro_schema(const char* path,
const char* schema,
const STableMapEvent& map,
const STableCreateEvent& create)
{
char filepath[PATH_MAX];
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avsc", path,
map->database.c_str(), map->table.c_str(), map->version);
snprintf(filepath,
sizeof(filepath),
"%s/%s.%s.%06d.avsc",
path,
map->database.c_str(),
map->table.c_str(),
map->version);
if (access(filepath, F_OK) != 0)
{
if (!create->was_used)
{
FILE *file = fopen(filepath, "wb");
FILE* file = fopen(filepath, "wb");
if (file)
{
fprintf(file, "%s\n", schema);
@ -221,20 +289,23 @@ static const char* codec_to_string(enum mxs_avro_codec_type type)
{
case MXS_AVRO_CODEC_NULL:
return "null";
case MXS_AVRO_CODEC_DEFLATE:
return "deflate";
case MXS_AVRO_CODEC_SNAPPY:
return "snappy";
default:
mxb_assert(false);
return "null";
}
}
AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec):
m_avrodir(avrodir),
m_block_size(block_size),
m_codec(codec)
AvroConverter::AvroConverter(std::string avrodir, uint64_t block_size, mxs_avro_codec_type codec)
: m_avrodir(avrodir)
, m_block_size(block_size)
, m_codec(codec)
{
}
@ -246,10 +317,16 @@ bool AvroConverter::open_table(const STableMapEvent& map, const STableCreateEven
if (json_schema)
{
char filepath[PATH_MAX + 1];
snprintf(filepath, sizeof(filepath), "%s/%s.%s.%06d.avro", m_avrodir.c_str(),
map->database.c_str(), map->table.c_str(), map->version);
snprintf(filepath,
sizeof(filepath),
"%s/%s.%s.%06d.avro",
m_avrodir.c_str(),
map->database.c_str(),
map->table.c_str(),
map->version);
SAvroTable avro_table(avro_table_alloc(filepath, json_schema,
SAvroTable avro_table(avro_table_alloc(filepath,
json_schema,
codec_to_string(m_codec),
m_block_size));
@ -386,7 +463,9 @@ void AvroConverter::column(int i)
void AvroConverter::set_active(int i)
{
MXB_AT_DEBUG(int rc =)avro_value_get_by_name(&m_record, m_create->columns[i].name.c_str(),
&m_field, NULL);
MXB_AT_DEBUG(int rc = ) avro_value_get_by_name(&m_record,
m_create->columns[i].name.c_str(),
&m_field,
NULL);
mxb_assert(rc == 0);
}

View File

@ -10,7 +10,7 @@
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#pragma once
#include "avrorouter.hh"
#include "rpl.hh"
@ -19,10 +19,10 @@
struct AvroTable
{
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema):
avro_file(file),
avro_writer_iface(iface),
avro_schema(schema)
AvroTable(avro_file_writer_t file, avro_value_iface_t* iface, avro_schema_t schema)
: avro_file(file)
, avro_writer_iface(iface)
, avro_schema(schema)
{
}
@ -34,9 +34,9 @@ struct AvroTable
avro_schema_decref(avro_schema);
}
avro_file_writer_t avro_file; /*< Current Avro data file */
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
avro_schema_t avro_schema; /*< Native Avro schema of the table */
avro_file_writer_t avro_file; /*< Current Avro data file */
avro_value_iface_t* avro_writer_iface; /*< Avro C API writer interface */
avro_schema_t avro_schema; /*< Native Avro schema of the table */
};
typedef std::shared_ptr<AvroTable> SAvroTable;

View File

@ -38,7 +38,7 @@
#include <maxscale/pcre2.h>
#include <maxscale/utils.h>
static const char *statefile_section = "avro-conversion";
static const char* statefile_section = "avro-conversion";
/**
@ -47,7 +47,7 @@ static const char *statefile_section = "avro-conversion";
* @param router The router instance
* @param file The binlog file name
*/
bool avro_open_binlog(const char *binlogdir, const char *file, int *dest)
bool avro_open_binlog(const char* binlogdir, const char* file, int* dest)
{
char path[PATH_MAX + 1] = "";
int fd;
@ -58,7 +58,9 @@ bool avro_open_binlog(const char *binlogdir, const char *file, int *dest)
{
if (errno != ENOENT)
{
MXS_ERROR("Failed to open binlog file %s: %d, %s", path, errno,
MXS_ERROR("Failed to open binlog file %s: %d, %s",
path,
errno,
mxs_strerror(errno));
}
return false;
@ -94,9 +96,9 @@ void avro_close_binlog(int fd)
* @return True if the file was written successfully to disk
*
*/
bool avro_save_conversion_state(Avro *router)
bool avro_save_conversion_state(Avro* router)
{
FILE *config_file;
FILE* config_file;
char filename[PATH_MAX + 1];
snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE ".tmp", router->avrodir.c_str());
@ -106,16 +108,22 @@ bool avro_save_conversion_state(Avro *router)
if (config_file == NULL)
{
MXS_ERROR("Failed to open file '%s': %d, %s", filename,
errno, mxs_strerror(errno));
MXS_ERROR("Failed to open file '%s': %d, %s",
filename,
errno,
mxs_strerror(errno));
return false;
}
gtid_pos_t gtid = router->handler.get_gtid();
fprintf(config_file, "[%s]\n", statefile_section);
fprintf(config_file, "position=%lu\n", router->current_pos);
fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", gtid.domain,
gtid.server_id, gtid.seq, gtid.event_num);
fprintf(config_file,
"gtid=%lu-%lu-%lu:%lu\n",
gtid.domain,
gtid.server_id,
gtid.seq,
gtid.event_num);
fprintf(config_file, "file=%s\n", router->binlog_name.c_str());
fclose(config_file);
@ -126,8 +134,11 @@ bool avro_save_conversion_state(Avro *router)
if (rc == -1)
{
MXS_ERROR("Failed to rename file '%s' to '%s': %d, %s", filename, newname,
errno, mxs_strerror(errno));
MXS_ERROR("Failed to rename file '%s' to '%s': %d, %s",
filename,
newname,
errno,
mxs_strerror(errno));
return false;
}
@ -145,14 +156,14 @@ bool avro_save_conversion_state(Avro *router)
*/
static int conv_state_handler(void* data, const char* section, const char* key, const char* value)
{
Avro *router = (Avro*) data;
Avro* router = (Avro*) data;
if (strcmp(section, statefile_section) == 0)
{
if (strcmp(key, "gtid") == 0)
{
gtid_pos_t gtid;
MXB_AT_DEBUG(bool rval = )gtid.parse(value);
MXB_AT_DEBUG(bool rval = ) gtid.parse(value);
mxb_assert(rval);
router->handler.set_gtid(gtid);
}
@ -167,7 +178,9 @@ static int conv_state_handler(void* data, const char* section, const char* key,
if (len > BINLOG_FNAMELEN)
{
MXS_ERROR("Provided value %s for key 'file' is too long. "
"The maximum allowed length is %d.", value, BINLOG_FNAMELEN);
"The maximum allowed length is %d.",
value,
BINLOG_FNAMELEN);
return 0;
}
@ -188,7 +201,7 @@ static int conv_state_handler(void* data, const char* section, const char* key,
* @param router Avro router instance
* @return True if the stored state was loaded successfully
*/
bool avro_load_conversion_state(Avro *router)
bool avro_load_conversion_state(Avro* router)
{
char filename[PATH_MAX + 1];
bool rval = false;
@ -211,9 +224,14 @@ bool avro_load_conversion_state(Avro *router)
{
rval = true;
gtid_pos_t gtid = router->handler.get_gtid();
MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]",
router->binlog_name.c_str(), router->current_pos, gtid.domain,
gtid.server_id, gtid.seq, gtid.event_num);
MXS_NOTICE(
"Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]",
router->binlog_name.c_str(),
router->current_pos,
gtid.domain,
gtid.server_id,
gtid.seq,
gtid.event_num);
}
break;
@ -227,7 +245,9 @@ bool avro_load_conversion_state(Avro *router)
default:
MXS_ERROR("Failed to parse stored conversion state '%s', error "
"on line %d. ", filename, rc);
"on line %d. ",
filename,
rc);
break;
}
@ -250,18 +270,23 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po
if (binlog_next_file_exists(router->binlogdir.c_str(), router->binlog_name.c_str()))
{
char next_binlog[BINLOG_FNAMELEN + 1];
if (snprintf(next_binlog, sizeof(next_binlog),
BINLOG_NAMEFMT, router->filestem.c_str(),
blr_file_get_next_binlogname(router->binlog_name.c_str())) >= (int)sizeof(next_binlog))
if (snprintf(next_binlog,
sizeof(next_binlog),
BINLOG_NAMEFMT,
router->filestem.c_str(),
blr_file_get_next_binlogname(router->binlog_name.c_str())) >= (int)sizeof(next_binlog))
{
MXS_ERROR("Next binlog name did not fit into the allocated buffer "
"but was truncated, aborting: %s", next_binlog);
"but was truncated, aborting: %s",
next_binlog);
rval = AVRO_BINLOG_ERROR;
}
else
{
MXS_INFO("End of binlog file [%s] at %lu. Rotating to next binlog file [%s].",
router->binlog_name.c_str(), pos, next_binlog);
router->binlog_name.c_str(),
pos,
next_binlog);
rval = AVRO_OK;
router->binlog_name = next_binlog;
router->current_pos = 4;
@ -280,10 +305,12 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po
* @param pos Current position, only used for logging
* @param next_binlog The next file to rotate to
*/
static void rotate_to_file(Avro* router, uint64_t pos, const char *next_binlog)
static void rotate_to_file(Avro* router, uint64_t pos, const char* next_binlog)
{
MXS_NOTICE("End of binlog file [%s] at %lu. Rotating to file [%s].",
router->binlog_name.c_str(), pos, next_binlog);
router->binlog_name.c_str(),
pos,
next_binlog);
router->binlog_name = next_binlog;
router->current_pos = 4;
}
@ -296,14 +323,16 @@ static void rotate_to_file(Avro* router, uint64_t pos, const char *next_binlog)
* @param pos Starting position of the event header
* @return The event data or NULL if an error occurred
*/
static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos)
static GWBUF* read_event_data(Avro* router, REP_HEADER* hdr, uint64_t pos)
{
GWBUF* result;
/* Allocate a GWBUF for the event */
if ((result = gwbuf_alloc(hdr->event_size - BINLOG_EVENT_HDR_LEN + 1)))
{
uint8_t *data = GWBUF_DATA(result);
int n = pread(router->binlog_fd, data, hdr->event_size - BINLOG_EVENT_HDR_LEN,
uint8_t* data = GWBUF_DATA(result);
int n = pread(router->binlog_fd,
data,
hdr->event_size - BINLOG_EVENT_HDR_LEN,
pos + BINLOG_EVENT_HDR_LEN);
/** NULL-terminate for QUERY_EVENT processing */
data[hdr->event_size - BINLOG_EVENT_HDR_LEN] = '\0';
@ -314,7 +343,8 @@ static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos)
{
MXS_ERROR("Error reading the event at %lu in %s. "
"%s, expected %d bytes.",
pos, router->binlog_name.c_str(),
pos,
router->binlog_name.c_str(),
mxs_strerror(errno),
hdr->event_size - BINLOG_EVENT_HDR_LEN);
}
@ -322,8 +352,10 @@ static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos)
{
MXS_ERROR("Short read when reading the event at %lu in %s. "
"Expected %d bytes got %d bytes.",
pos, router->binlog_name.c_str(),
hdr->event_size - BINLOG_EVENT_HDR_LEN, n);
pos,
router->binlog_name.c_str(),
hdr->event_size - BINLOG_EVENT_HDR_LEN,
n);
}
gwbuf_free(result);
result = NULL;
@ -333,7 +365,8 @@ static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos)
{
MXS_ERROR("Failed to allocate memory for binlog entry, "
"size %d at %lu.",
hdr->event_size, pos);
hdr->event_size,
pos);
}
return result;
}
@ -355,7 +388,7 @@ void notify_all_clients(SERVICE* service)
dcb_foreach(notify_cb, service);
}
void do_checkpoint(Avro *router)
void do_checkpoint(Avro* router)
{
router->handler.flush();
avro_save_conversion_state(router);
@ -378,7 +411,8 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
case -1:
MXS_ERROR("Failed to read binlog file %s at position %llu (%s).",
router->binlog_name.c_str(), pos,
router->binlog_name.c_str(),
pos,
mxs_strerror(errno));
break;
@ -386,7 +420,9 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
MXS_ERROR("Short read when reading the header. "
"Expected 19 bytes but got %d bytes. "
"Binlog file is %s, position %llu",
n, router->binlog_name.c_str(), pos);
n,
router->binlog_name.c_str(),
pos);
break;
}
@ -402,7 +438,9 @@ bool read_header(Avro* router, unsigned long long pos, REP_HEADER* hdr, avro_bin
if (hdr->event_type > MAX_EVENT_TYPE_MARIADB10)
{
MXS_ERROR("Invalid MariaDB 10 event type 0x%x. Binlog file is %s, position %llu",
hdr->event_type, router->binlog_name.c_str(), pos);
hdr->event_type,
router->binlog_name.c_str(),
pos);
router->current_pos = pos;
*rc = AVRO_BINLOG_ERROR;
rval = false;
@ -425,12 +463,19 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
if (hdr.next_pos > 0 && hdr.next_pos < pos)
{
MXS_INFO("Binlog %s: next pos %u < pos %lu, truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, pos);
router->binlog_name.c_str(),
hdr.next_pos,
pos,
pos);
}
else if (hdr.next_pos > 0 && hdr.next_pos != (pos + hdr.event_size))
{
MXS_INFO("Binlog %s: next pos %u != (pos %lu + event_size %u), truncating to %lu",
router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos);
router->binlog_name.c_str(),
hdr.next_pos,
pos,
hdr.event_size,
pos);
}
else if (hdr.next_pos > 0)
{
@ -439,7 +484,9 @@ static bool pos_is_ok(Avro* router, const REP_HEADER& hdr, uint64_t pos)
else
{
MXS_ERROR("Current event type %d @ %lu has nex pos = %u : exiting",
hdr.event_type, pos, hdr.next_pos);
hdr.event_type,
pos,
hdr.next_pos);
}
return rval;
@ -453,7 +500,7 @@ bool read_fde(Avro* router)
if (read_header(router, 4, &hdr, &rc))
{
if (GWBUF *result = read_event_data(router, &hdr, 4))
if (GWBUF* result = read_event_data(router, &hdr, 4))
{
router->handler.handle_event(hdr, GWBUF_DATA(result));
rval = true;
@ -479,14 +526,15 @@ bool read_fde(Avro* router)
* @return How the binlog was closed
* @see enum avro_binlog_end
*/
avro_binlog_end_t avro_read_all_events(Avro *router)
avro_binlog_end_t avro_read_all_events(Avro* router)
{
mxb_assert(router->binlog_fd != -1);
if (!read_fde(router))
{
MXS_ERROR("Failed to read the FDE event from the binary log: %d, %s",
errno, mxs_strerror(errno));
errno,
mxs_strerror(errno));
return AVRO_BINLOG_ERROR;
}
@ -517,7 +565,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
return rc;
}
GWBUF *result = read_event_data(router, &hdr, pos);
GWBUF* result = read_event_data(router, &hdr, pos);
if (result == NULL)
{
@ -538,7 +586,8 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT)
{
// This appears to need special handling
int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN - (router->handler.have_checksums() ? 4 : 0);
int annotate_len = hdr.event_size - BINLOG_EVENT_HDR_LEN
- (router->handler.have_checksums() ? 4 : 0);
MXS_INFO("Annotate_rows_event: %.*s", annotate_len, ptr);
pos += hdr.event_size;
router->current_pos = pos;
@ -547,8 +596,8 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
}
else
{
if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1)
|| (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
router->row_count++;
}
@ -562,8 +611,8 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
gwbuf_free(result);
if (router->row_count >= router->row_target ||
router->trx_count >= router->trx_target)
if (router->row_count >= router->row_target
|| router->trx_count >= router->trx_target)
{
do_checkpoint(router);
}
@ -587,7 +636,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router)
*
* @param router Router instance
*/
void avro_load_metadata_from_schemas(Avro *router)
void avro_load_metadata_from_schemas(Avro* router)
{
char path[PATH_MAX + 1];
snprintf(path, sizeof(path), "%s/*.avsc", router->avrodir.c_str());
@ -603,31 +652,33 @@ void avro_load_metadata_from_schemas(Avro *router)
* them in reverse should give us the newest schema first. */
for (int i = files.gl_pathc - 1; i > -1; i--)
{
char *dbstart = strrchr(files.gl_pathv[i], '/');
char* dbstart = strrchr(files.gl_pathv[i], '/');
mxb_assert(dbstart);
dbstart++;
char *tablestart = strchr(dbstart, '.');
char* tablestart = strchr(dbstart, '.');
mxb_assert(tablestart);
snprintf(db, sizeof(db), "%.*s", (int)(tablestart - dbstart), dbstart);
tablestart++;
char *versionstart = strchr(tablestart, '.');
char* versionstart = strchr(tablestart, '.');
mxb_assert(versionstart);
snprintf(table, sizeof(table), "%.*s", (int)(versionstart - tablestart), tablestart);
versionstart++;
char *suffix = strchr(versionstart, '.');
char *versionend = NULL;
char* suffix = strchr(versionstart, '.');
char* versionend = NULL;
version = strtol(versionstart, &versionend, 10);
if (versionend == suffix)
{
snprintf(table_ident, sizeof(table_ident), "%s.%s", db, table);
STableCreateEvent created(table_create_from_schema(files.gl_pathv[i],
db, table, version));
db,
table,
version));
router->handler.add_create(created);
}
else

View File

@ -45,7 +45,7 @@
using namespace maxbase;
static bool conversion_task_ctl(Avro *inst, bool start);
static bool conversion_task_ctl(Avro* inst, bool start);
/**
* Create an instance of the router for a particular service
@ -60,11 +60,12 @@ static bool conversion_task_ctl(Avro *inst, bool start);
*
* @return The instance data for this new instance
*/
MXS_ROUTER* createInstance(SERVICE *service, MXS_CONFIG_PARAMETER* params)
MXS_ROUTER* createInstance(SERVICE* service, MXS_CONFIG_PARAMETER* params)
{
uint64_t block_size = config_get_size(service->svc_config_param, "block_size");
mxs_avro_codec_type codec = static_cast<mxs_avro_codec_type>(config_get_enum(service->svc_config_param,
"codec", codec_values));
"codec",
codec_values));
std::string avrodir = config_get_string(service->svc_config_param, "avrodir");
SRowEventHandler handler(new AvroConverter(avrodir, block_size, codec));
@ -89,8 +90,7 @@ MXS_ROUTER* createInstance(SERVICE *service, MXS_CONFIG_PARAMETER* params)
* @param session The session itself
* @return Session specific data for this session
*/
static MXS_ROUTER_SESSION *
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
static MXS_ROUTER_SESSION* newSession(MXS_ROUTER* instance, MXS_SESSION* session)
{
Avro* inst = reinterpret_cast<Avro*>(instance);
return AvroSession::create(inst, session);
@ -110,7 +110,7 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
*/
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
{
AvroSession *client = (AvroSession *) router_client_ses;
AvroSession* client = (AvroSession*) router_client_ses;
delete client;
}
@ -121,7 +121,7 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_
* @param instance The router instance data
* @param router_session The session being closed
*/
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
static void closeSession(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session)
{
}
@ -137,10 +137,9 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
* @param queue The queue of data buffers to route
* @return 1 on success, 0 on error
*/
static int
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
static int routeQuery(MXS_ROUTER* instance, MXS_ROUTER_SESSION* router_session, GWBUF* queue)
{
AvroSession *client = (AvroSession *) router_session;
AvroSession* client = (AvroSession*) router_session;
return client->routeQuery(queue);
}
@ -151,23 +150,29 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
* @param instance Instance of the router
* @param dcb DCB to send diagnostics to
*/
static void
diagnostics(MXS_ROUTER *router, DCB *dcb)
static void diagnostics(MXS_ROUTER* router, DCB* dcb)
{
Avro *router_inst = (Avro *) router;
Avro* router_inst = (Avro*) router;
gtid_pos_t gtid = router_inst->handler.get_gtid();
dcb_printf(dcb, "\tAVRO files directory: %s\n",
dcb_printf(dcb,
"\tAVRO files directory: %s\n",
router_inst->avrodir.c_str());
dcb_printf(dcb, "\tBinlog directory: %s\n",
dcb_printf(dcb,
"\tBinlog directory: %s\n",
router_inst->binlogdir.c_str());
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
dcb_printf(dcb,
"\tCurrent binlog file: %s\n",
router_inst->binlog_name.c_str());
dcb_printf(dcb, "\tCurrent binlog position: %lu\n",
dcb_printf(dcb,
"\tCurrent binlog position: %lu\n",
router_inst->current_pos);
dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n",
gtid.domain, gtid.server_id, gtid.seq);
dcb_printf(dcb,
"\tCurrent GTID value: %lu-%lu-%lu\n",
gtid.domain,
gtid.server_id,
gtid.seq);
dcb_printf(dcb, "\tCurrent GTID timestamp: %u\n", gtid.timestamp);
dcb_printf(dcb, "\tCurrent GTID #events: %lu\n", gtid.event_num);
}
@ -177,9 +182,9 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
*
* @param instance Instance of the router
*/
static json_t* diagnostics_json(const MXS_ROUTER *router)
static json_t* diagnostics_json(const MXS_ROUTER* router)
{
Avro *router_inst = (Avro *)router;
Avro* router_inst = (Avro*)router;
json_t* rval = json_object();
@ -213,8 +218,10 @@ static json_t* diagnostics_json(const MXS_ROUTER *router)
* @param master_dcb The DCB for the connection to the master
* @param queue The GWBUF with reply data
*/
static void
clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue, DCB *backend_dcb)
static void clientReply(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* queue,
DCB* backend_dcb)
{
/** We should never end up here */
mxb_assert(false);
@ -234,10 +241,12 @@ clientReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *que
* @param succp Result of action: true iff router can continue
*
*/
static void
errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *message, DCB *backend_dcb,
mxs_error_action_t action,
bool *succp)
static void errorReply(MXS_ROUTER* instance,
MXS_ROUTER_SESSION* router_session,
GWBUF* message,
DCB* backend_dcb,
mxs_error_action_t action,
bool* succp)
{
/** We should never end up here */
mxb_assert(false);
@ -291,18 +300,19 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
logged = true;
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
" more data is written before continuing.",
router->binlog_name.c_str(), router->current_pos);
router->binlog_name.c_str(),
router->current_pos);
}
return true;
}
class ConversionCtlTask: public Worker::DisposableTask
class ConversionCtlTask : public Worker::DisposableTask
{
public:
ConversionCtlTask(Avro* instance, bool start):
m_instance(instance),
m_start(start)
ConversionCtlTask(Avro* instance, bool start)
: m_instance(instance)
, m_start(start)
{
}
@ -325,14 +335,14 @@ private:
bool m_start;
};
static bool conversion_task_ctl(Avro *inst, bool start)
static bool conversion_task_ctl(Avro* inst, bool start)
{
bool rval = false;
if (!maxscale_is_shutting_down())
{
Worker* worker = static_cast<Worker*>(mxs_rworker_get(MXS_RWORKER_MAIN));
std::unique_ptr<ConversionCtlTask> task(new (std::nothrow) ConversionCtlTask(inst, start));
std::unique_ptr<ConversionCtlTask> task(new( std::nothrow) ConversionCtlTask(inst, start));
if (task.get())
{
@ -344,18 +354,18 @@ static bool conversion_task_ctl(Avro *inst, bool start)
return rval;
}
bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output)
bool avro_handle_convert(const MODULECMD_ARG* args, json_t** output)
{
bool rval = false;
if (strcmp(args->argv[1].value.string, "start") == 0 &&
conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, true))
if (strcmp(args->argv[1].value.string, "start") == 0
&& conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, true))
{
MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
}
else if (strcmp(args->argv[1].value.string, "stop") == 0 &&
conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, false))
else if (strcmp(args->argv[1].value.string, "stop") == 0
&& conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, false))
{
MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
@ -406,7 +416,9 @@ static bool do_unlink_with_pattern(const char* format, ...)
else if (rc != GLOB_NOMATCH)
{
modulecmd_set_error("Failed to search '%s': %d, %s",
filename, errno, mxs_strerror(errno));
filename,
errno,
mxs_strerror(errno));
rval = false;
}
@ -415,7 +427,7 @@ static bool do_unlink_with_pattern(const char* format, ...)
return rval;
}
static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
static bool avro_handle_purge(const MODULECMD_ARG* args, json_t** output)
{
Avro* inst = (Avro*)args->argv[0].value.service->router_instance;
@ -423,9 +435,9 @@ static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
conversion_task_ctl(inst, false);
// Then delete the files
return do_unlink("%s/%s", inst->avrodir.c_str(), AVRO_PROGRESS_FILE) && // State file
do_unlink_with_pattern("/%s/*.avro", inst->avrodir.c_str()) && // .avro files
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir.c_str()); // .avsc files
return do_unlink("%s/%s", inst->avrodir.c_str(), AVRO_PROGRESS_FILE) // State file
&& do_unlink_with_pattern("/%s/*.avro", inst->avrodir.c_str()) // .avro files
&& do_unlink_with_pattern("/%s/*.avsc", inst->avrodir.c_str()); // .avsc files
}
/**
@ -440,11 +452,17 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
{
static modulecmd_arg_type_t args_convert[] =
{
{ MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN, "The avrorouter service" },
{ MODULECMD_ARG_STRING, "Action, whether to 'start' or 'stop' the conversion process" }
{MODULECMD_ARG_SERVICE | MODULECMD_ARG_NAME_MATCHES_DOMAIN,
"The avrorouter service" },
{MODULECMD_ARG_STRING,
"Action, whether to 'start' or 'stop' the conversion process" }
};
modulecmd_register_command(MXS_MODULE_NAME, "convert", MODULECMD_TYPE_ACTIVE,
avro_handle_convert, 2, args_convert,
modulecmd_register_command(MXS_MODULE_NAME,
"convert",
MODULECMD_TYPE_ACTIVE,
avro_handle_convert,
2,
args_convert,
"Start or stop the binlog to avro conversion process");
static modulecmd_arg_type_t args_purge[] =
@ -454,8 +472,12 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
"The avrorouter service to purge (NOTE: THIS REMOVES ALL CONVERTED FILES)"
}
};
modulecmd_register_command(MXS_MODULE_NAME, "purge", MODULECMD_TYPE_ACTIVE,
avro_handle_purge, 1, args_purge,
modulecmd_register_command(MXS_MODULE_NAME,
"purge",
MODULECMD_TYPE_ACTIVE,
avro_handle_purge,
1,
args_purge,
"Purge created Avro files and reset conversion state. "
"NOTE: MaxScale must be restarted after this call.");
@ -483,38 +505,48 @@ extern "C" MXS_MODULE* MXS_CREATE_MODULE()
"V1.0.0",
RCAP_TYPE_NO_RSESSION | RCAP_TYPE_NO_AUTH,
&MyObject,
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
NULL, /* Process init. */
NULL, /* Process finish. */
NULL, /* Thread init. */
NULL, /* Thread finish. */
{
{
"binlogdir",
MXS_MODULE_PARAM_PATH,
NULL,
MXS_MODULE_OPT_PATH_R_OK |
MXS_MODULE_OPT_PATH_W_OK |
MXS_MODULE_OPT_PATH_X_OK |
MXS_MODULE_OPT_PATH_CREAT
MXS_MODULE_OPT_PATH_R_OK
| MXS_MODULE_OPT_PATH_W_OK
| MXS_MODULE_OPT_PATH_X_OK
| MXS_MODULE_OPT_PATH_CREAT
},
{
"avrodir",
MXS_MODULE_PARAM_PATH,
MXS_DEFAULT_DATADIR,
MXS_MODULE_OPT_PATH_R_OK |
MXS_MODULE_OPT_PATH_W_OK |
MXS_MODULE_OPT_PATH_X_OK |
MXS_MODULE_OPT_PATH_CREAT
MXS_MODULE_OPT_PATH_R_OK
| MXS_MODULE_OPT_PATH_W_OK
| MXS_MODULE_OPT_PATH_X_OK
| MXS_MODULE_OPT_PATH_CREAT
},
{"source", MXS_MODULE_PARAM_SERVICE},
{"filestem", MXS_MODULE_PARAM_STRING, BINLOG_NAME_ROOT},
{"group_rows", MXS_MODULE_PARAM_COUNT, "1000"},
{"group_trx", MXS_MODULE_PARAM_COUNT, "1"},
{"start_index", MXS_MODULE_PARAM_COUNT, "1"},
{"block_size", MXS_MODULE_PARAM_SIZE, "0"},
{"codec", MXS_MODULE_PARAM_ENUM, "null", MXS_MODULE_OPT_ENUM_UNIQUE, codec_values},
{"match", MXS_MODULE_PARAM_REGEX},
{"exclude", MXS_MODULE_PARAM_REGEX},
{"source",
MXS_MODULE_PARAM_SERVICE },
{"filestem", MXS_MODULE_PARAM_STRING,
BINLOG_NAME_ROOT },
{"group_rows", MXS_MODULE_PARAM_COUNT,
"1000" },
{"group_trx", MXS_MODULE_PARAM_COUNT,
"1" },
{"start_index", MXS_MODULE_PARAM_COUNT,
"1" },
{"block_size", MXS_MODULE_PARAM_SIZE,
"0" },
{"codec", MXS_MODULE_PARAM_ENUM, "null",
MXS_MODULE_OPT_ENUM_UNIQUE,
codec_values },
{"match",
MXS_MODULE_PARAM_REGEX },
{"exclude",
MXS_MODULE_PARAM_REGEX },
{MXS_END_MODULE_PARAMS}
}
};

View File

@ -20,13 +20,13 @@
#include <signal.h>
#include <maxscale/utils.h>
#define WRITE_EVENT 0
#define UPDATE_EVENT 1
#define UPDATE_EVENT_AFTER 2
#define DELETE_EVENT 3
#define WRITE_EVENT 0
#define UPDATE_EVENT 1
#define UPDATE_EVENT_AFTER 2
#define DELETE_EVENT 3
static bool warn_decimal = false; /**< Remove when support for DECIMAL is added */
static bool warn_bit = false; /**< Remove when support for BIT is added */
static bool warn_decimal = false; /**< Remove when support for DECIMAL is added */
static bool warn_bit = false; /**< Remove when support for BIT is added */
static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET values
* larger than 255 is added */
@ -72,8 +72,11 @@ static int get_event_type(uint8_t event)
* @param metadata Field metadata
* @param value Pointer to the start of the in-memory representation of the data
*/
void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type,
uint8_t *metadata, uint8_t *value)
void set_numeric_field_value(SRowEventHandler& conv,
int idx,
uint8_t type,
uint8_t* metadata,
uint8_t* value)
{
switch (type)
{
@ -86,7 +89,7 @@ void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type,
case TABLE_COL_TYPE_SHORT:
{
short s = gw_mysql_get_byte2(value);
short s = gw_mysql_get_byte2(value);
conv->column(idx, s);
break;
}
@ -147,7 +150,7 @@ void set_numeric_field_value(SRowEventHandler& conv, int idx, uint8_t type,
* @param current_column Zero indexed column number
* @return True if the bit is set
*/
static bool bit_is_set(uint8_t *ptr, int columns, int current_column)
static bool bit_is_set(uint8_t* ptr, int columns, int current_column)
{
if (current_column >= 8)
{
@ -155,7 +158,7 @@ static bool bit_is_set(uint8_t *ptr, int columns, int current_column)
current_column = current_column % 8;
}
return ((*ptr) & (1 << current_column));
return (*ptr) & (1 << current_column);
}
/**
@ -192,17 +195,18 @@ int get_metadata_len(uint8_t type)
}
// Make sure that both `i` and `trace` are defined before using this macro
#define check_overflow(t) do \
#define check_overflow(t) \
do \
{ \
if (!(t)) \
{ \
for (long x = 0; x < i;x++) \
for (long x = 0; x < i; x++) \
{ \
MXS_ALERT("%s", trace[x]); \
} \
raise(SIGABRT); \
} \
}while(false)
} while (false)
// Debug function for checking whether a row event consists of only NULL values
static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
@ -232,13 +236,16 @@ static bool all_fields_null(uint8_t* null_bitmap, int ncolumns)
* this row event. Currently this should be a bitfield which has all bits set.
* @return Pointer to the first byte after the current row event
*/
uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
SRowEventHandler& conv, uint8_t *ptr,
uint8_t *columns_present, uint8_t *end)
uint8_t* process_row_event_data(STableMapEvent map,
STableCreateEvent create,
SRowEventHandler& conv,
uint8_t* ptr,
uint8_t* columns_present,
uint8_t* end)
{
int npresent = 0;
long ncolumns = map->columns();
uint8_t *metadata = &map->column_metadata[0];
uint8_t* metadata = &map->column_metadata[0];
size_t metadata_offset = 0;
/** BIT type values use the extra bits in the row event header */
@ -246,7 +253,7 @@ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
mxb_assert(ptr < end);
/** Store the null value bitmap */
uint8_t *null_bitmap = ptr;
uint8_t* null_bitmap = ptr;
ptr += (ncolumns + 7) / 8;
mxb_assert(ptr < end || (bit_is_set(null_bitmap, ncolumns, 0)));
@ -386,9 +393,11 @@ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
{
char buf[80];
struct tm tm;
ptr += unpack_temporal_value(map->column_types[i], ptr,
ptr += unpack_temporal_value(map->column_types[i],
ptr,
&metadata[metadata_offset],
create->columns[i].length, &tm);
create->columns[i].length,
&tm);
format_temporal_value(buf, sizeof(buf), map->column_types[i], &tm);
conv->column(i, buf);
sprintf(trace[i], "[%ld] %s: %s", i, column_type_to_string(map->column_types[i]), buf);
@ -399,8 +408,10 @@ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
{
uint8_t lval[16];
memset(lval, 0, sizeof(lval));
ptr += unpack_numeric_field(ptr, map->column_types[i],
&metadata[metadata_offset], lval);
ptr += unpack_numeric_field(ptr,
map->column_types[i],
&metadata[metadata_offset],
lval);
set_numeric_field_value(conv, i, map->column_types[i], &metadata[metadata_offset], lval);
sprintf(trace[i], "[%ld] %s", i, column_type_to_string(map->column_types[i]));
check_overflow(ptr <= end);
@ -427,7 +438,7 @@ uint8_t* process_row_event_data(STableMapEvent map, STableCreateEvent create,
* @param dest Destination where the string is stored
* @param len Size of destination
*/
void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, char* dest, size_t len)
void read_table_info(uint8_t* ptr, uint8_t post_header_len, uint64_t* tbl_id, char* dest, size_t len)
{
uint64_t table_id = 0;
size_t id_size = post_header_len == 6 ? 4 : 6;
@ -464,7 +475,7 @@ void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *tbl_id, ch
* @param hdr Replication header
* @param ptr Pointer to event payload
*/
bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
bool Rpl::handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr)
{
bool rval = false;
uint64_t id;
@ -490,8 +501,8 @@ bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
{
auto old = it->second;
if (old->id == map->id && old->version == map->version &&
old->table == map->table && old->database == map->database)
if (old->id == map->id && old->version == map->version
&& old->table == map->table && old->database == map->database)
{
// We can reuse the table map object
return true;
@ -521,7 +532,8 @@ bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
{
MXS_WARNING("Table map event for table '%s' read before the DDL statement "
"for that table was read. Data will not be processed for this "
"table until a DDL statement for it is read.", table_ident);
"table until a DDL statement for it is read.",
table_ident);
}
return rval;
@ -538,10 +550,10 @@ bool Rpl::handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr)
* @param ptr Pointer to the start of the event
* @return True on succcess, false on error
*/
bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
bool Rpl::handle_row_event(REP_HEADER* hdr, uint8_t* ptr)
{
bool rval = false;
uint8_t *end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
uint8_t* end = ptr + hdr->event_size - BINLOG_EVENT_HDR_LEN;
uint8_t table_id_size = m_event_type_hdr_lens[hdr->event_type] == 6 ? 4 : 6;
uint64_t table_id = 0;
@ -589,8 +601,8 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
* used to calculate a "delta" of sorts if necessary. Currently we store
* both the before and the after images. */
uint8_t col_update[coldata_size];
if (hdr->event_type == UPDATE_ROWS_EVENTv1 ||
hdr->event_type == UPDATE_ROWS_EVENTv2)
if (hdr->event_type == UPDATE_ROWS_EVENTv1
|| hdr->event_type == UPDATE_ROWS_EVENTv2)
{
memcpy(&col_update, ptr, coldata_size);
ptr += coldata_size;
@ -614,8 +626,8 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
bool ok = m_handler->prepare_table(map->database, map->table);
auto create = m_created_tables.find(table_ident);
if (ok && create != m_created_tables.end() &&
ncolumns == map->columns() && create->second->columns.size() == map->columns())
if (ok && create != m_created_tables.end()
&& ncolumns == map->columns() && create->second->columns.size() == map->columns())
{
/** Each event has one or more rows in it. The number of rows is not known
* beforehand so we must continue processing them until we reach the end
@ -652,31 +664,39 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
else if (!ok)
{
MXS_ERROR("Avro file handle was not found for table %s.%s. See earlier"
" errors for more details.", map->database.c_str(), map->table.c_str());
" errors for more details.",
map->database.c_str(),
map->table.c_str());
}
else if (create == m_created_tables.end())
{
MXS_ERROR("Create table statement for %s.%s was not found from the "
"binary logs or the stored schema was not correct.",
map->database.c_str(), map->table.c_str());
map->database.c_str(),
map->table.c_str());
}
else if (ncolumns == map->columns() && create->second->columns.size() != map->columns())
{
MXS_ERROR("Table map event has a different column count for table "
"%s.%s than the CREATE TABLE statement. Possible "
"unsupported DDL detected.", map->database.c_str(), map->table.c_str());
"unsupported DDL detected.",
map->database.c_str(),
map->table.c_str());
}
else
{
MXS_ERROR("Row event and table map event have different column "
"counts for table %s.%s, only full row image is currently "
"supported.", map->database.c_str(), map->table.c_str());
"supported.",
map->database.c_str(),
map->table.c_str());
}
}
else
{
MXS_INFO("Row event for unknown table mapped to ID %lu. Data will not "
"be processed.", table_id);
"be processed.",
table_id);
}
return rval;
@ -692,7 +712,7 @@ bool Rpl::handle_row_event(REP_HEADER *hdr, uint8_t *ptr)
bool is_create_table_statement(pcre2_code* create_table_re, char* ptr, size_t len)
{
int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(create_table_re, NULL);
if (mdata)
{
@ -719,17 +739,17 @@ bool is_create_as_statement(const char* ptr, size_t len)
char sql[len + 1];
memcpy(sql, ptr, len);
sql[len] = '\0';
const char* pattern =
// Case-insensitive mode
"(?i)"
// Main CREATE TABLE part (the \s is for any whitespace)
"create\\stable\\s"
// Optional IF NOT EXISTS
"(if\\snot\\sexists\\s)?"
// The table name with optional database name, both enclosed in optional backticks
"(`?\\S+`?.)`?\\S+`?\\s"
// And finally the AS keyword
"as";
const char* pattern
= // Case-insensitive mode
"(?i)"
// Main CREATE TABLE part (the \s is for any whitespace)
"create\\stable\\s"
// Optional IF NOT EXISTS
"(if\\snot\\sexists\\s)?"
// The table name with optional database name, both enclosed in optional backticks
"(`?\\S+`?.)`?\\S+`?\\s"
// And finally the AS keyword
"as";
return mxs_pcre2_simple_match(pattern, sql, 0, &err) == MXS_PCRE2_MATCH;
}
@ -744,7 +764,7 @@ bool is_create_as_statement(const char* ptr, size_t len)
bool is_alter_table_statement(pcre2_code* alter_table_re, char* ptr, size_t len)
{
int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
pcre2_match_data* mdata = pcre2_match_data_create_from_pattern(alter_table_re, NULL);
if (mdata)
{
@ -792,7 +812,7 @@ bool Rpl::save_and_replace_table_create(STableCreateEvent created)
return m_handler->create_table(created);
}
void unify_whitespace(char *sql, int len)
void unify_whitespace(char* sql, int len)
{
for (int i = 0; i < len; i++)
{
@ -813,7 +833,7 @@ void unify_whitespace(char *sql, int len)
* @param len Pointer to current length of string, updated to new length if
* @c sql is modified
*/
static void strip_executable_comments(char *sql, int* len)
static void strip_executable_comments(char* sql, int* len)
{
if (strncmp(sql, "/*!", 3) == 0 || strncmp(sql, "/*M!", 4) == 0)
{
@ -845,18 +865,18 @@ static void strip_executable_comments(char *sql, int* len)
* @param pending_transaction Pointer where status of pending transaction is stored
* @param ptr Pointer to the start of the event payload
*/
void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr)
void Rpl::handle_query_event(REP_HEADER* hdr, uint8_t* ptr)
{
int dblen = ptr[DBNM_OFF];
int vblklen = gw_mysql_get_byte2(ptr + VBLK_OFF);
int len = hdr->event_size - BINLOG_EVENT_HDR_LEN - (PHDR_OFF + vblklen + 1 + dblen);
char *sql = (char *) ptr + PHDR_OFF + vblklen + 1 + dblen;
char* sql = (char*) ptr + PHDR_OFF + vblklen + 1 + dblen;
char db[dblen + 1];
memcpy(db, (char*) ptr + PHDR_OFF + vblklen, dblen);
db[dblen] = 0;
size_t sqlsz = len, tmpsz = len;
char *tmp = static_cast<char*>(MXS_MALLOC(len + 1));
char* tmp = static_cast<char*>(MXS_MALLOC(len + 1));
MXS_ABORT_IF_NULL(tmp);
remove_mysql_comments((const char**)&sql, &sqlsz, &tmp, &tmpsz);
sql = tmp;
@ -906,7 +926,9 @@ void Rpl::handle_query_event(REP_HEADER *hdr, uint8_t *ptr)
static bool warn_create_as = true;
if (warn_create_as)
{
MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s", len, sql);
MXS_WARNING("`CREATE TABLE AS` is not yet supported, ignoring events to this table: %.*s",
len,
sql);
warn_create_as = false;
}
}
@ -962,8 +984,8 @@ void Rpl::handle_event(REP_HEADER hdr, uint8_t* ptr)
{
handle_table_map_event(&hdr, ptr);
}
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1) ||
(hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
else if ((hdr.event_type >= WRITE_ROWS_EVENTv0 && hdr.event_type <= DELETE_ROWS_EVENTv1)
|| (hdr.event_type >= WRITE_ROWS_EVENTv2 && hdr.event_type <= DELETE_ROWS_EVENTv2))
{
handle_row_event(&hdr, ptr);
}

View File

@ -37,9 +37,9 @@
*/
static inline bool not_generated_field(const char* name)
{
return strcmp(name, avro_domain) && strcmp(name, avro_server_id) &&
strcmp(name, avro_sequence) && strcmp(name, avro_event_number) &&
strcmp(name, avro_event_type) && strcmp(name, avro_timestamp);
return strcmp(name, avro_domain) && strcmp(name, avro_server_id)
&& strcmp(name, avro_sequence) && strcmp(name, avro_event_number)
&& strcmp(name, avro_event_type) && strcmp(name, avro_timestamp);
}
/**
@ -58,7 +58,7 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
bool rval = false;
json_error_t err;
err.text[0] = '\0';
json_t *obj, *arr;
json_t* obj, * arr;
if ((obj = json_load_file(filename, 0, &err)) && (arr = json_object_get(obj, "fields")))
{
@ -74,11 +74,11 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
if (json_is_object(val))
{
json_t *name = json_object_get(val, "name");
json_t* name = json_object_get(val, "name");
if (name && json_is_string(name))
{
const char *name_str = json_string_value(name);
const char* name_str = json_string_value(name);
if (not_generated_field(name_str))
{
@ -108,14 +108,16 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
else
{
MXS_ERROR("JSON value for \"name\" was not a string in "
"file '%s'.", filename);
"file '%s'.",
filename);
rval = false;
}
}
else
{
MXS_ERROR("JSON value for \"fields\" was not an array of objects in "
"file '%s'.", filename);
"file '%s'.",
filename);
rval = false;
}
}
@ -128,22 +130,25 @@ bool json_extract_field_names(const char* filename, std::vector<Column>& columns
}
else
{
MXS_ERROR("Failed to load JSON from file '%s': %s", filename,
MXS_ERROR("Failed to load JSON from file '%s': %s",
filename,
obj && !arr ? "No 'fields' value in object." : err.text);
}
return rval;
}
TableCreateEvent* table_create_from_schema(const char* file, const char* db,
const char* table, int version)
TableCreateEvent* table_create_from_schema(const char* file,
const char* db,
const char* table,
int version)
{
TableCreateEvent* newtable = NULL;
std::vector<Column> columns;
if (json_extract_field_names(file, columns))
{
newtable = new (std::nothrow)TableCreateEvent(db, table, version, std::move(columns));
newtable = new( std::nothrow) TableCreateEvent(db, table, version, std::move(columns));
}
return newtable;

View File

@ -10,7 +10,7 @@
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#pragma once
#define MXS_MODULE_NAME "avrorouter"
@ -38,28 +38,28 @@ MXS_BEGIN_DECLS
/** Name of the file where the binlog to Avro conversion progress is stored */
#define AVRO_PROGRESS_FILE "avro-conversion.ini"
static const char *avro_client_states[] = { "Unregistered", "Registered", "Processing", "Errored" };
static const char *avro_client_client_mode[] = { "Catch-up", "Busy", "Wait_for_data" };
static const char* avro_client_states[] = {"Unregistered", "Registered", "Processing", "Errored"};
static const char* avro_client_client_mode[] = {"Catch-up", "Busy", "Wait_for_data"};
static const char *avro_domain = "domain";
static const char *avro_server_id = "server_id";
static const char *avro_sequence = "sequence";
static const char *avro_event_number = "event_number";
static const char *avro_event_type = "event_type";
static const char *avro_timestamp = "timestamp";
static const char *avro_client_ouput[] = { "Undefined", "JSON", "Avro" };
static const char* avro_domain = "domain";
static const char* avro_server_id = "server_id";
static const char* avro_sequence = "sequence";
static const char* avro_event_number = "event_number";
static const char* avro_event_type = "event_type";
static const char* avro_timestamp = "timestamp";
static const char* avro_client_ouput[] = {"Undefined", "JSON", "Avro"};
static inline bool is_reserved_word(const char* word)
{
return strcasecmp(word, avro_domain) == 0 ||
strcasecmp(word, avro_server_id) == 0 ||
strcasecmp(word, avro_sequence) == 0 ||
strcasecmp(word, avro_event_number) == 0 ||
strcasecmp(word, avro_event_type) == 0 ||
strcasecmp(word, avro_timestamp) == 0;
return strcasecmp(word, avro_domain) == 0
|| strcasecmp(word, avro_server_id) == 0
|| strcasecmp(word, avro_sequence) == 0
|| strcasecmp(word, avro_event_number) == 0
|| strcasecmp(word, avro_event_type) == 0
|| strcasecmp(word, avro_timestamp) == 0;
}
static inline void fix_reserved_word(char *tok)
static inline void fix_reserved_word(char* tok)
{
if (is_reserved_word(tok))
{
@ -105,15 +105,15 @@ enum mxs_avro_codec_type
static const MXS_ENUM_VALUE codec_values[] =
{
{"null", MXS_AVRO_CODEC_NULL},
{"deflate", MXS_AVRO_CODEC_DEFLATE},
{"null", MXS_AVRO_CODEC_NULL },
{"deflate", MXS_AVRO_CODEC_DEFLATE },
// Not yet implemented
// {"snappy", MXS_AVRO_CODEC_SNAPPY},
{NULL}
};
class Avro: public MXS_ROUTER
class Avro : public MXS_ROUTER
{
Avro(const Avro&) = delete;
Avro& operator=(const Avro&) = delete;
@ -121,26 +121,26 @@ class Avro: public MXS_ROUTER
public:
static Avro* create(SERVICE* service, SRowEventHandler handler);
SERVICE* service; /*< Pointer to the service using this router */
std::string filestem; /*< Root of binlog filename */
std::string binlogdir; /*< The directory where the binlog files are stored */
std::string avrodir; /*< The directory with the AVRO files */
std::string binlog_name; /*< Name of the current binlog file */
uint64_t current_pos; /*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog file being read */
uint64_t trx_count; /*< Transactions processed */
uint64_t trx_target; /*< Number of transactions that trigger a flush */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Number of row events that trigger a flush */
uint32_t task_handle; /**< Delayed task handle */
Rpl handler;
SERVICE* service; /*< Pointer to the service using this router */
std::string filestem; /*< Root of binlog filename */
std::string binlogdir; /*< The directory where the binlog files are stored */
std::string avrodir; /*< The directory with the AVRO files */
std::string binlog_name;/*< Name of the current binlog file */
uint64_t current_pos;/*< Current binlog position */
int binlog_fd; /*< File descriptor of the binlog file being read */
uint64_t trx_count; /*< Transactions processed */
uint64_t trx_target; /*< Number of transactions that trigger a flush */
uint64_t row_count; /*< Row events processed */
uint64_t row_target; /*< Number of row events that trigger a flush */
uint32_t task_handle;/**< Delayed task handle */
Rpl handler;
private:
Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, SERVICE* source, SRowEventHandler handler);
void read_source_service_options(SERVICE* source);
};
class AvroSession: public MXS_ROUTER_SESSION
class AvroSession : public MXS_ROUTER_SESSION
{
AvroSession(const AvroSession&) = delete;
AvroSession& operator=(const AvroSession&) = delete;
@ -149,19 +149,19 @@ public:
static AvroSession* create(Avro* router, MXS_SESSION* session);
~AvroSession();
DCB* dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
std::string uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE* file_handle; /*< Current open file handle */
uint64_t last_sent_pos; /*< The last record we sent */
DCB* dcb; /*< The client DCB */
int state; /*< The state of this client */
enum avro_data_format format; /*< Stream JSON or Avro data */
std::string uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE* file_handle; /*< Current open file handle */
uint64_t last_sent_pos;/*< The last record we sent */
time_t connect_time; /*< Connect time of slave */
std::string avro_binfile;
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
bool requested_gtid; /*< If the client requested */
gtid_pos_t gtid; /*< Current/requested GTID */
gtid_pos_t gtid_start; /*< First sent GTID */
/**
* Process a client request
@ -180,10 +180,10 @@ public:
private:
AvroSession(Avro* instance, MXS_SESSION* session);
int do_registration(GWBUF *data);
void process_command(GWBUF *queue);
void send_gtid_info(gtid_pos_t *gtid_pos);
void set_current_gtid(json_t *row);
int do_registration(GWBUF* data);
void process_command(GWBUF* queue);
void send_gtid_info(gtid_pos_t* gtid_pos);
void set_current_gtid(json_t* row);
bool stream_json();
bool stream_binary();
bool seek_to_gtid();
@ -191,26 +191,36 @@ private:
void rotate_avro_file(std::string fullname);
};
void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id, char* dest, size_t len);
TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create);
void read_table_info(uint8_t* ptr,
uint8_t post_header_len,
uint64_t* table_id,
char* dest,
size_t len);
TableMapEvent* table_map_alloc(uint8_t* ptr, uint8_t hdr_len, TableCreateEvent* create);
STableCreateEvent table_create_alloc(char* ident, const char* sql, int len);
bool table_create_save(TableCreateEvent *create, const char *filename);
bool table_create_alter(TableCreateEvent *create, const char *sql, const char *end);
TableCreateEvent* table_create_from_schema(const char* file, const char* db, const char* table,
bool table_create_save(TableCreateEvent* create, const char* filename);
bool table_create_alter(TableCreateEvent* create, const char* sql, const char* end);
TableCreateEvent* table_create_from_schema(const char* file,
const char* db,
const char* table,
int version);
void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
int avro_client_handle_request(Avro *, AvroSession *, GWBUF *);
void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr);
bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
void avro_close_binlog(int fd);
avro_binlog_end_t avro_read_all_events(Avro *router);
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro *router);
bool avro_load_conversion_state(Avro *router);
void avro_load_metadata_from_schemas(Avro *router);
void notify_all_clients(Avro *router);
void read_table_identifier(const char* db,
const char* sql,
const char* end,
char* dest,
int size);
int avro_client_handle_request(Avro*, AvroSession*, GWBUF*);
void avro_client_rotate(Avro* router, AvroSession* client, uint8_t* ptr);
bool avro_open_binlog(const char* binlogdir, const char* file, int* fd);
void avro_close_binlog(int fd);
avro_binlog_end_t avro_read_all_events(Avro* router);
char* json_new_schema_from_table(const STableMapEvent& map, const STableCreateEvent& create);
bool handle_row_event(Avro* router, REP_HEADER* hdr, uint8_t* ptr);
void handle_one_event(Avro* router, uint8_t* ptr, REP_HEADER& hdr, uint64_t& pos);
REP_HEADER construct_header(uint8_t* ptr);
bool avro_save_conversion_state(Avro* router);
bool avro_load_conversion_state(Avro* router);
void avro_load_metadata_from_schemas(Avro* router);
void notify_all_clients(Avro* router);
MXS_END_DECLS

View File

@ -35,10 +35,10 @@ bool gtid_pos_t::parse(const char* str)
bool rval = false;
char buf[strlen(str) + 1];
strcpy(buf, str);
char *saved, *dom = strtok_r(buf, ":-\n", &saved);
char *serv_id = strtok_r(NULL, ":-\n", &saved);
char *sequence = strtok_r(NULL, ":-\n", &saved);
char *subseq = strtok_r(NULL, ":-\n", &saved);
char* saved, * dom = strtok_r(buf, ":-\n", &saved);
char* serv_id = strtok_r(NULL, ":-\n", &saved);
char* sequence = strtok_r(NULL, ":-\n", &saved);
char* subseq = strtok_r(NULL, ":-\n", &saved);
if (dom && serv_id && sequence)
{
@ -86,11 +86,12 @@ Column Column::from_json(json_t* json)
json_t* type = json_object_get(json, "type");
json_t* length = json_object_get(json, "length");
if (name && json_is_string(name) &&
type && json_is_string(type) &&
length && json_is_integer(length))
if (name && json_is_string(name)
&& type && json_is_string(type)
&& length && json_is_integer(length))
{
return Column(json_string_value(name), json_string_value(type),
return Column(json_string_value(name),
json_string_value(type),
json_integer_value(length));
}
@ -124,8 +125,8 @@ STableCreateEvent TableCreateEvent::from_json(json_t* obj)
json_t* version = json_object_get(obj, "version");
json_t* columns = json_object_get(obj, "columns");
if (json_is_string(table) && json_is_string(database) &&
json_is_integer(version) && json_is_array(columns))
if (json_is_string(table) && json_is_string(database)
&& json_is_integer(version) && json_is_array(columns))
{
std::string tbl = json_string_value(table);
std::string db = json_string_value(database);
@ -139,10 +140,9 @@ STableCreateEvent TableCreateEvent::from_json(json_t* obj)
cols.emplace_back(Column::from_json(val));
}
auto is_empty = [](const Column & col)
{
return col.name.empty();
};
auto is_empty = [](const Column& col) {
return col.name.empty();
};
if (std::none_of(cols.begin(), cols.end(), is_empty))
{
@ -160,11 +160,11 @@ STableCreateEvent TableCreateEvent::from_json(json_t* obj)
* @return Pointer to the start of the definition of NULL if the query is
* malformed.
*/
static const char* get_table_definition(const char *sql, int len, int* size)
static const char* get_table_definition(const char* sql, int len, int* size)
{
const char *rval = NULL;
const char *ptr = sql;
const char *end = sql + len;
const char* rval = NULL;
const char* ptr = sql;
const char* end = sql + len;
while (ptr < end && *ptr != '(')
{
ptr++;
@ -175,7 +175,7 @@ static const char* get_table_definition(const char *sql, int len, int* size)
{
int depth = 0;
ptr++;
const char *start = ptr; // Skip first parenthesis
const char* start = ptr; // Skip first parenthesis
while (ptr < end)
{
switch (*ptr)
@ -350,7 +350,7 @@ const char* next_field_definition(const char* ptr)
return ptr;
}
static const char *extract_field_name(const char* ptr, char* dest, size_t size)
static const char* extract_field_name(const char* ptr, char* dest, size_t size)
{
bool bt = false;
@ -365,17 +365,17 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
if (!bt)
{
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0 ||
strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0 ||
strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0 ||
strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0)
if (strncasecmp(ptr, "constraint", 10) == 0 || strncasecmp(ptr, "index", 5) == 0
|| strncasecmp(ptr, "key", 3) == 0 || strncasecmp(ptr, "fulltext", 8) == 0
|| strncasecmp(ptr, "spatial", 7) == 0 || strncasecmp(ptr, "foreign", 7) == 0
|| strncasecmp(ptr, "unique", 6) == 0 || strncasecmp(ptr, "primary", 7) == 0)
{
// Found a keyword
return NULL;
}
}
const char *start = ptr;
const char* start = ptr;
if (!bt)
{
@ -411,7 +411,7 @@ static const char *extract_field_name(const char* ptr, char* dest, size_t size)
return ptr;
}
int extract_type_length(const char* ptr, char *dest)
int extract_type_length(const char* ptr, char* dest)
{
/** Skip any leading whitespace */
while (*ptr && (isspace(*ptr) || *ptr == '`'))
@ -420,7 +420,7 @@ int extract_type_length(const char* ptr, char *dest)
}
/** The field type definition starts here */
const char *start = ptr;
const char* start = ptr;
/** Skip characters until we either hit a whitespace character or the start
* of the length definition. */
@ -443,13 +443,13 @@ int extract_type_length(const char* ptr, char *dest)
ptr++;
}
int rval = -1; // No length defined
int rval = -1; // No length defined
/** Start of length definition */
if (*ptr == '(')
{
ptr++;
char *end;
char* end;
int val = strtol(ptr, &end, 10);
if (*end == ')')
@ -479,7 +479,7 @@ int count_columns(const char* ptr)
* @param nameptr table definition
* @return Number of processed columns or -1 on error
*/
static void process_column_definition(const char *nameptr, std::vector<Column>& columns)
static void process_column_definition(const char* nameptr, std::vector<Column>& columns)
{
char colname[512];
@ -541,7 +541,7 @@ STableCreateEvent table_create_alloc(char* ident, const char* sql, int len)
if (!columns.empty())
{
int version = resolve_table_version(database, table);
rval.reset(new (std::nothrow) TableCreateEvent(database, table, version, std::move(columns)));
rval.reset(new( std::nothrow) TableCreateEvent(database, table, version, std::move(columns)));
}
else
{
@ -560,7 +560,7 @@ STableCreateEvent table_create_alloc(char* ident, const char* sql, int len)
* @param post_header_len Length of the event specific header, 8 or 6 bytes
* @return New TABLE_MAP or NULL if memory allocation failed
*/
TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent* create)
TableMapEvent* table_map_alloc(uint8_t* ptr, uint8_t hdr_len, TableCreateEvent* create)
{
uint64_t table_id = 0;
size_t id_size = hdr_len == 6 ? 4 : 6;
@ -589,45 +589,60 @@ TableMapEvent *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TableCreateEvent*
ptr += mxs_leint_bytes(ptr);
/** Column types */
uint8_t *column_types = ptr;
uint8_t* column_types = ptr;
ptr += column_count;
size_t metadata_size = 0;
uint8_t* metadata = (uint8_t*)mxs_lestr_consume(&ptr, &metadata_size);
uint8_t *nullmap = ptr;
uint8_t* nullmap = ptr;
size_t nullmap_size = (column_count + 7) / 8;
Bytes cols(column_types, column_types + column_count);
Bytes nulls(nullmap, nullmap + nullmap_size);
Bytes meta(metadata, metadata + metadata_size);
return new (std::nothrow)TableMapEvent(schema_name, table_name, table_id, create->version,
std::move(cols), std::move(nulls), std::move(meta));
return new( std::nothrow) TableMapEvent(schema_name,
table_name,
table_id,
create->version,
std::move(cols),
std::move(nulls),
std::move(meta));
}
Rpl::Rpl(SERVICE* service, SRowEventHandler handler, pcre2_code* match, pcre2_code* exclude,
gtid_pos_t gtid):
m_handler(handler),
m_service(service),
m_binlog_checksum(0),
m_event_types(0),
m_gtid(gtid),
m_match(match),
m_exclude(exclude),
m_md_match(m_match ? pcre2_match_data_create_from_pattern(m_match, NULL) : nullptr),
m_md_exclude(m_exclude ? pcre2_match_data_create_from_pattern(m_exclude, NULL) : nullptr)
Rpl::Rpl(SERVICE* service,
SRowEventHandler handler,
pcre2_code* match,
pcre2_code* exclude,
gtid_pos_t gtid)
: m_handler(handler)
, m_service(service)
, m_binlog_checksum(0)
, m_event_types(0)
, m_gtid(gtid)
, m_match(match)
, m_exclude(exclude)
, m_md_match(m_match ? pcre2_match_data_create_from_pattern(m_match, NULL) : nullptr)
, m_md_exclude(m_exclude ? pcre2_match_data_create_from_pattern(m_exclude, NULL) : nullptr)
{
/** For detection of CREATE/ALTER TABLE statements */
static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table";
static const char* alter_table_regex = "(?i)alter[[:space:]]+table";
int pcreerr;
size_t erroff;
m_create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
m_alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
m_create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex,
PCRE2_ZERO_TERMINATED,
0,
&pcreerr,
&erroff,
NULL);
m_alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex,
PCRE2_ZERO_TERMINATED,
0,
&pcreerr,
&erroff,
NULL);
mxb_assert_message(m_create_table_re && m_alter_table_re,
"CREATE TABLE and ALTER TABLE regex compilation should not fail");
"CREATE TABLE and ALTER TABLE regex compilation should not fail");
}
void Rpl::flush()
@ -849,7 +864,7 @@ STableCreateEvent Rpl::table_create_copy(const char* sql, size_t len, const char
if (it != m_created_tables.end())
{
rval.reset(new (std::nothrow) TableCreateEvent(*it->second));
rval.reset(new( std::nothrow) TableCreateEvent(*it->second));
char* table = strchr(target, '.');
table = table ? table + 1 : target;
rval->table = table;
@ -859,7 +874,10 @@ STableCreateEvent Rpl::table_create_copy(const char* sql, size_t len, const char
else
{
MXS_ERROR("Could not find table '%s' that '%s' is being created from: %.*s",
table_ident, target, (int)len, sql);
table_ident,
target,
(int)len,
sql);
}
}
@ -891,7 +909,7 @@ static const char* get_next_def(const char* sql, const char* end)
static const char* get_tok(const char* sql, int* toklen, const char* end)
{
const char *start = sql;
const char* start = sql;
while (isspace(*start))
{
@ -997,7 +1015,7 @@ static bool get_placement_specifier(const char* sql, const char* end, const char
else
{
// Something else, possibly FIRST or un-backtick'd AFTER
const char* id_end = end + 1; // Points to either a trailing space or one-after-the-end
const char* id_end = end + 1; // Points to either a trailing space or one-after-the-end
rskip_token(sql, &end);
// end points to the character _before_ the token
@ -1032,7 +1050,7 @@ static bool get_placement_specifier(const char* sql, const char* end, const char
return rval;
}
static bool tok_eq(const char *a, const char *b, size_t len)
static bool tok_eq(const char* a, const char* b, size_t len)
{
size_t i = 0;
@ -1111,7 +1129,7 @@ static bool token_is_keyword(const char* tok, int len)
return false;
}
void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size)
void read_table_identifier(const char* db, const char* sql, const char* end, char* dest, int size)
{
const char* start;
int len = 0;
@ -1119,7 +1137,7 @@ void read_table_identifier(const char* db, const char *sql, const char *end, cha
while (is_keyword)
{
skip_whitespace(&sql); // Leading whitespace
skip_whitespace(&sql); // Leading whitespace
if (*sql == '`')
{
@ -1140,7 +1158,7 @@ void read_table_identifier(const char* db, const char *sql, const char *end, cha
}
}
skip_whitespace(&sql); // Space after first identifier
skip_whitespace(&sql); // Space after first identifier
if (*sql != '.')
{
@ -1151,7 +1169,7 @@ void read_table_identifier(const char* db, const char *sql, const char *end, cha
{
// Explicit database, skip the period
sql++;
skip_whitespace(&sql); // Space after first identifier
skip_whitespace(&sql); // Space after first identifier
const char* id_start;
int id_len = 0;
@ -1183,7 +1201,7 @@ void make_avro_token(char* dest, const char* src, int length)
length--;
}
const char *end = src;
const char* end = src;
for (int i = 0; i < length; i++)
{
@ -1228,14 +1246,14 @@ static bool not_column_operation(const char* tok, int len)
return false;
}
bool Rpl::table_create_alter(STableCreateEvent create, const char *sql, const char *end)
bool Rpl::table_create_alter(STableCreateEvent create, const char* sql, const char* end)
{
const char *tbl = strcasestr(sql, "table"), *def;
const char* tbl = strcasestr(sql, "table"), * def;
if ((def = strchr(tbl, ' ')))
{
int len = 0;
const char *tok = get_tok(def, &len, end);
const char* tok = get_tok(def, &len, end);
if (tok)
{
@ -1247,7 +1265,7 @@ bool Rpl::table_create_alter(STableCreateEvent create, const char *sql, const ch
while (tok && (tok = get_tok(tok + len, &len, end)))
{
const char *ptok = tok;
const char* ptok = tok;
int plen = len;
tok = get_tok(tok + len, &len, end);
@ -1283,7 +1301,7 @@ bool Rpl::table_create_alter(STableCreateEvent create, const char *sql, const ch
if (is_new)
{
char field_type[200] = ""; // Enough to hold all types
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
create->columns.emplace_back(std::string(avro_token),
std::string(field_type),
@ -1319,16 +1337,14 @@ bool Rpl::table_create_alter(STableCreateEvent create, const char *sql, const ch
{
char avro_token[len + 1];
make_avro_token(avro_token, tok, len);
char field_type[200] = ""; // Enough to hold all types
char field_type[200] = ""; // Enough to hold all types
int field_length = extract_type_length(tok + len, field_type);
it->name = avro_token;
it->type = field_type;
it->length = field_length;
updates++;
}
}
}
tok = get_next_def(tok, end);
@ -1366,11 +1382,21 @@ bool Rpl::table_matches(const std::string& ident)
{
bool rval = false;
if (!m_match || pcre2_match(m_match, (PCRE2_SPTR)ident.c_str(), PCRE2_ZERO_TERMINATED,
0, 0, m_md_match, NULL) > 0)
if (!m_match || pcre2_match(m_match,
(PCRE2_SPTR)ident.c_str(),
PCRE2_ZERO_TERMINATED,
0,
0,
m_md_match,
NULL) > 0)
{
if (!m_exclude || pcre2_match(m_exclude, (PCRE2_SPTR)ident.c_str(), PCRE2_ZERO_TERMINATED,
0, 0, m_md_exclude, NULL) == PCRE2_ERROR_NOMATCH)
if (!m_exclude || pcre2_match(m_exclude,
(PCRE2_SPTR)ident.c_str(),
PCRE2_ZERO_TERMINATED,
0,
0,
m_md_exclude,
NULL) == PCRE2_ERROR_NOMATCH)
{
rval = true;
}

View File

@ -10,7 +10,7 @@
* of this software will be governed by version 2 or later of the General
* Public License.
*/
#pragma once
#pragma once
#include <vector>
#include <cstdint>
@ -28,12 +28,12 @@ typedef std::vector<uint8_t> Bytes;
// A GTID position
struct gtid_pos_t
{
gtid_pos_t():
timestamp(0),
domain(0),
server_id(0),
seq(0),
event_num(0)
gtid_pos_t()
: timestamp(0)
, domain(0)
, server_id(0)
, seq(0)
, event_num(0)
{
}
@ -46,20 +46,20 @@ struct gtid_pos_t
* an event inside a GTID event and it is used to
* rebuild GTID events in the correct order. */
void extract(const REP_HEADER& hdr, uint8_t* ptr);
bool parse(const char* str);
void extract(const REP_HEADER& hdr, uint8_t* ptr);
bool parse(const char* str);
static gtid_pos_t from_string(std::string str);
std::string to_string() const;
bool empty() const;
std::string to_string() const;
bool empty() const;
};
/** A single column in a CREATE TABLE statement */
struct Column
{
Column(std::string name, std::string type = "unknown", int length = -1):
name(name),
type(type),
length(length)
Column(std::string name, std::string type = "unknown", int length = -1)
: name(name)
, type(type)
, length(length)
{
}
@ -67,7 +67,7 @@ struct Column
std::string type;
int length;
json_t* to_json() const;
json_t* to_json() const;
static Column from_json(json_t* json);
};
@ -77,12 +77,12 @@ typedef std::shared_ptr<TableCreateEvent> STableCreateEvent;
/** A CREATE TABLE abstraction */
struct TableCreateEvent
{
TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>&& cols):
columns(cols),
table(table),
database(db),
version(version),
was_used(false)
TableCreateEvent(std::string db, std::string table, int version, std::vector<Column>&& cols)
: columns(cols)
, table(table)
, database(db)
, version(version)
, was_used(false)
{
}
@ -112,11 +112,11 @@ struct TableCreateEvent
*/
static STableCreateEvent from_json(json_t* json);
std::vector<Column> columns;
std::string table;
std::string database;
int version; /**< How many versions of this table have been used */
bool was_used; /**< Has this schema been persisted to disk */
std::vector<Column> columns;
std::string table;
std::string database;
int version; /**< How many versions of this table have been used */
bool was_used; /**< Has this schema been persisted to disk */
};
/** A representation of a table map event read from a binary log. A table map
@ -125,15 +125,20 @@ struct TableCreateEvent
* some meta information on the columns. */
struct TableMapEvent
{
TableMapEvent(const std::string& db, const std::string& table, uint64_t id,
int version, Bytes&& cols, Bytes&& nulls, Bytes&& metadata):
database(db),
table(table),
id(id),
version(version),
column_types(cols),
null_bitmap(nulls),
column_metadata(metadata)
TableMapEvent(const std::string& db,
const std::string& table,
uint64_t id,
int version,
Bytes&& cols,
Bytes&& nulls,
Bytes&& metadata)
: database(db)
, table(table)
, id(id)
, version(version)
, column_types(cols)
, null_bitmap(nulls)
, column_metadata(metadata)
{
}
@ -151,7 +156,7 @@ struct TableMapEvent
Bytes column_metadata;
};
typedef std::shared_ptr<TableMapEvent> STableMapEvent;
typedef std::shared_ptr<TableMapEvent> STableMapEvent;
// Containers for the replication events
typedef std::unordered_map<std::string, STableCreateEvent> CreatedTables;
@ -226,7 +231,10 @@ public:
Rpl& operator=(const Rpl&) = delete;
// Construct a new replication stream transformer
Rpl(SERVICE* service, SRowEventHandler event_handler, pcre2_code* match, pcre2_code* exclude,
Rpl(SERVICE* service,
SRowEventHandler event_handler,
pcre2_code* match,
pcre2_code* exclude,
gtid_pos_t = {});
// Add a stored TableCreateEvent
@ -273,11 +281,11 @@ private:
pcre2_match_data* m_md_match;
pcre2_match_data* m_md_exclude;
void handle_query_event(REP_HEADER *hdr, uint8_t *ptr);
bool handle_table_map_event(REP_HEADER *hdr, uint8_t *ptr);
bool handle_row_event(REP_HEADER *hdr, uint8_t *ptr);
void handle_query_event(REP_HEADER* hdr, uint8_t* ptr);
bool handle_table_map_event(REP_HEADER* hdr, uint8_t* ptr);
bool handle_row_event(REP_HEADER* hdr, uint8_t* ptr);
STableCreateEvent table_create_copy(const char* sql, size_t len, const char* db);
bool save_and_replace_table_create(STableCreateEvent created);
bool table_create_alter(STableCreateEvent create, const char *sql, const char *end);
bool table_matches(const std::string& ident);
bool save_and_replace_table_create(STableCreateEvent created);
bool table_create_alter(STableCreateEvent create, const char* sql, const char* end);
bool table_matches(const std::string& ident);
};

File diff suppressed because one or more lines are too long