Convert avrorouter objects to C++

The instance and session objects are now C++ structs. The next pointers
for the sessions was removed as it is not the appropriate place to store
this information. This means that the client notification functionality is
broken in this commit.
This commit is contained in:
Markus Mäkelä
2018-05-22 09:28:53 +03:00
parent 354e9aeefc
commit 3095b18b14
7 changed files with 101 additions and 304 deletions

View File

@ -16,7 +16,7 @@
* MySQL replication binlog files and AVRO binary files
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <avro/errors.h>
#include <ctype.h>
@ -76,35 +76,35 @@ static void errorReply(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session,
DCB *backend_dcb, mxs_error_action_t action, bool *succp);
static uint64_t getCapabilities(MXS_ROUTER* instance);
extern int MaxScaleUptime();
extern void avro_get_used_tables(AVRO_INSTANCE *router, DCB *dcb);
extern void avro_get_used_tables(Avro *router, DCB *dcb);
bool converter_func(void* data);
bool binlog_next_file_exists(const char* binlogdir, const char* binlog);
int blr_file_get_next_binlogname(const char *router);
bool avro_load_conversion_state(AVRO_INSTANCE *router);
void avro_load_metadata_from_schemas(AVRO_INSTANCE *router);
bool avro_load_conversion_state(Avro *router);
void avro_load_metadata_from_schemas(Avro *router);
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata);
static bool ensure_dir_ok(const char* path, int mode);
bool avro_save_conversion_state(AVRO_INSTANCE *router);
bool avro_save_conversion_state(Avro *router);
static void stats_func(void *);
void avro_index_file(AVRO_INSTANCE *router, const char* path);
void avro_update_index(AVRO_INSTANCE* router);
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start);
void avro_index_file(Avro *router, const char* path);
void avro_update_index(Avro* router);
static bool conversion_task_ctl(Avro *inst, bool start);
static SPINLOCK instlock;
static AVRO_INSTANCE *instances;
static Avro *instances;
bool avro_handle_convert(const MODULECMD_ARG *args, json_t** output)
{
bool rval = false;
if (strcmp(args->argv[1].value.string, "start") == 0 &&
conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, true))
conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, true))
{
MXS_NOTICE("Started conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
}
else if (strcmp(args->argv[1].value.string, "stop") == 0 &&
conversion_task_ctl((AVRO_INSTANCE*)args->argv[0].value.service->router_instance, false))
conversion_task_ctl((Avro*)args->argv[0].value.service->router_instance, false))
{
MXS_NOTICE("Stopped conversion for service '%s'.", args->argv[0].value.service->name);
rval = true;
@ -175,7 +175,7 @@ static bool do_unlink_with_pattern(const char* format, ...)
static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
{
AVRO_INSTANCE* inst = (AVRO_INSTANCE*)args->argv[0].value.service->router_instance;
Avro* inst = (Avro*)args->argv[0].value.service->router_instance;
// First stop the conversion service
conversion_task_ctl(inst, false);
@ -354,7 +354,7 @@ bool create_tables(sqlite3* handle)
return true;
}
static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
static bool conversion_task_ctl(Avro *inst, bool start)
{
bool rval = false;
@ -388,7 +388,7 @@ static bool conversion_task_ctl(AVRO_INSTANCE *inst, bool start)
* @param inst Avro router instance
* @param options The @c router_options of a binlogrouter instance
*/
void read_source_service_options(AVRO_INSTANCE *inst, const char** options,
void read_source_service_options(Avro *inst, const char** options,
MXS_CONFIG_PARAMETER* params)
{
for (MXS_CONFIG_PARAMETER* p = params; p; p = p->next)
@ -476,22 +476,19 @@ static void table_map_hfree(void* v)
static MXS_ROUTER *
createInstance(SERVICE *service, char **options)
{
AVRO_INSTANCE *inst;
int i;
Avro *inst = new (std::nothrow) Avro;
if ((inst = static_cast<AVRO_INSTANCE*>(MXS_CALLOC(1, sizeof(AVRO_INSTANCE)))) == NULL)
if (inst == NULL)
{
return NULL;
}
memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS));
spinlock_init(&inst->lock);
spinlock_init(&inst->fileslock);
inst->service = service;
inst->binlog_fd = -1;
inst->current_pos = 4;
inst->binlog_position = 4;
inst->clients = NULL;
inst->next = NULL;
inst->lastEventTimestamp = 0;
inst->binlog_position = 0;
@ -551,64 +548,6 @@ createInstance(SERVICE *service, char **options)
inst->binlogdir = MXS_STRDUP_A(param->value);
}
if (options)
{
MXS_WARNING("Router options for Avrorouter are deprecated. Please convert them to parameters.");
for (i = 0; options[i]; i++)
{
char *value;
if ((value = strchr(options[i], '=')))
{
*value++ = '\0';
trim(value);
trim(options[i]);
if (strcmp(options[i], "binlogdir") == 0)
{
MXS_FREE(inst->binlogdir);
inst->binlogdir = MXS_STRDUP_A(value);
}
else if (strcmp(options[i], "avrodir") == 0)
{
MXS_FREE(inst->avrodir);
inst->avrodir = MXS_STRDUP_A(value);
}
else if (strcmp(options[i], "filestem") == 0)
{
MXS_FREE(inst->fileroot);
inst->fileroot = MXS_STRDUP_A(value);
}
else if (strcmp(options[i], "group_rows") == 0)
{
inst->row_target = atoi(value);
}
else if (strcmp(options[i], "group_trx") == 0)
{
inst->trx_target = atoi(value);
}
else if (strcmp(options[i], "start_index") == 0)
{
first_file = MXS_MAX(1, atoi(value));
}
else if (strcmp(options[i], "block_size") == 0)
{
inst->block_size = atoi(value);
}
else
{
MXS_WARNING("Unknown router option: '%s'", options[i]);
err = true;
}
}
else
{
MXS_WARNING("Unknown router option: '%s'", options[i]);
err = true;
}
}
}
if (inst->binlogdir == NULL)
{
MXS_ERROR("No 'binlogdir' option found in source service, in parameters or in router_options.");
@ -698,15 +637,6 @@ createInstance(SERVICE *service, char **options)
MXS_FREE(inst);
return NULL;
}
/**
* We have completed the creation of the instance data, so now
* insert this router instance into the linked list of routers
* that have been created with this module.
*/
spinlock_acquire(&instlock);
inst->next = instances;
instances = inst;
spinlock_release(&instlock);
/* AVRO converter init */
avro_load_conversion_state(inst);
@ -743,13 +673,10 @@ createInstance(SERVICE *service, char **options)
static MXS_ROUTER_SESSION *
newSession(MXS_ROUTER *instance, MXS_SESSION *session)
{
AVRO_INSTANCE *inst = (AVRO_INSTANCE *) instance;
AVRO_CLIENT *client;
Avro *inst = (Avro *) instance;
AvroSession *client = new (std::nothrow) AvroSession;
MXS_DEBUG("%lu [newSession] new router session with "
"session %p, and inst %p.", pthread_self(), session, inst);
if ((client = (AVRO_CLIENT *) MXS_CALLOC(1, sizeof(AVRO_CLIENT))) == NULL)
if (client == NULL)
{
return NULL;
}
@ -788,14 +715,6 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
sqlite3_close_v2(client->sqlite_handle);
}
/**
* Add this session to the list of active sessions.
*/
spinlock_acquire(&inst->lock);
client->next = inst->clients;
inst->clients = client;
spinlock_release(&inst->lock);
CHK_CLIENT_RSES(client);
return reinterpret_cast<MXS_ROUTER_SESSION*>(client);
@ -815,8 +734,8 @@ newSession(MXS_ROUTER *instance, MXS_SESSION *session)
*/
static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_client_ses)
{
AVRO_INSTANCE *router = (AVRO_INSTANCE *) router_instance;
AVRO_CLIENT *client = (AVRO_CLIENT *) router_client_ses;
Avro *router = (Avro *) router_instance;
AvroSession *client = (AvroSession *) router_client_ses;
ss_debug(int prev_val = )atomic_add(&router->stats.n_clients, -1);
ss_dassert(prev_val > 0);
@ -825,32 +744,7 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_
maxavro_file_close(client->file_handle);
sqlite3_close_v2(client->sqlite_handle);
/*
* Remove the slave session form the list of slaves that are using the
* router currently.
*/
spinlock_acquire(&router->lock);
if (router->clients == client)
{
router->clients = client->next;
}
else
{
AVRO_CLIENT *ptr = router->clients;
while (ptr != NULL && ptr->next != client)
{
ptr = ptr->next;
}
if (ptr != NULL)
{
ptr->next = client->next;
}
}
spinlock_release(&router->lock);
MXS_FREE(client);
delete client;
}
/**
@ -862,8 +756,8 @@ static void freeSession(MXS_ROUTER* router_instance, MXS_ROUTER_SESSION* router_
*/
static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session)
{
AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance;
AVRO_CLIENT *client = (AVRO_CLIENT *) router_session;
Avro *router = (Avro *) instance;
AvroSession *client = (AvroSession *) router_session;
CHK_CLIENT_RSES(client);
@ -891,8 +785,8 @@ static void closeSession(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_sessio
static int
routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queue)
{
AVRO_INSTANCE *router = (AVRO_INSTANCE *) instance;
AVRO_CLIENT *client = (AVRO_CLIENT *) router_session;
Avro *router = (Avro *) instance;
AvroSession *client = (AvroSession *) router_session;
return avro_client_handle_request(router, client, queue);
}
@ -906,21 +800,10 @@ routeQuery(MXS_ROUTER *instance, MXS_ROUTER_SESSION *router_session, GWBUF *queu
static void
diagnostics(MXS_ROUTER *router, DCB *dcb)
{
AVRO_INSTANCE *router_inst = (AVRO_INSTANCE *) router;
AVRO_CLIENT *session;
int i = 0;
Avro *router_inst = (Avro *) router;
char buf[40];
struct tm tm;
spinlock_acquire(&router_inst->lock);
session = router_inst->clients;
while (session)
{
i++;
session = session->next;
}
spinlock_release(&router_inst->lock);
dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n",
router_inst->avrodir, AVRO_PROGRESS_FILE);
dcb_printf(dcb, "\tAVRO files directory: %s\n",
@ -950,54 +833,6 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
dcb_printf(dcb, "\tNumber of AVRO clients: %u\n",
router_inst->stats.n_clients);
if (router_inst->clients)
{
dcb_printf(dcb, "\tClients:\n");
spinlock_acquire(&router_inst->lock);
session = router_inst->clients;
while (session)
{
char sync_marker_hex[SYNC_MARKER_SIZE * 2 + 1];
dcb_printf(dcb, "\t\tClient UUID: %s\n", session->uuid);
dcb_printf(dcb, "\t\tClient_host_port: [%s]:%d\n",
session->dcb->remote, dcb_get_port(session->dcb));
dcb_printf(dcb, "\t\tUsername: %s\n", session->dcb->user);
dcb_printf(dcb, "\t\tClient DCB: %p\n", session->dcb);
dcb_printf(dcb, "\t\tClient protocol: %s\n",
session->dcb->service->ports->protocol);
dcb_printf(dcb, "\t\tClient Output Format: %s\n",
avro_client_ouput[session->format]);
dcb_printf(dcb, "\t\tState: %s\n",
avro_client_states[session->state]);
dcb_printf(dcb, "\t\tAvro file: %s\n", session->avro_binfile);
gw_bin2hex(sync_marker_hex, session->avro_file.sync, SYNC_MARKER_SIZE);
dcb_printf(dcb, "\t\tAvro file SyncMarker: %s\n", sync_marker_hex);
dcb_printf(dcb, "\t\tAvro file last read block: %lu\n",
session->avro_file.blocks_read);
dcb_printf(dcb, "\t\tAvro file last read record: %lu\n",
session->avro_file.records_read);
if (session->gtid_start.domain > 0 || session->gtid_start.server_id > 0 ||
session->gtid_start.seq > 0)
{
dcb_printf(dcb, "\t\tRequested GTID: %lu-%lu-%lu\n",
session->gtid_start.domain, session->gtid_start.server_id,
session->gtid_start.seq);
}
dcb_printf(dcb, "\t\tCurrent GTID: %lu-%lu-%lu\n",
session->gtid.domain, session->gtid.server_id,
session->gtid.seq);
dcb_printf(dcb, "\t\t--------------------\n\n");
session = session->next;
}
spinlock_release(&router_inst->lock);
}
}
/**
@ -1007,7 +842,7 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
*/
static json_t* diagnostics_json(const MXS_ROUTER *router)
{
AVRO_INSTANCE *router_inst = (AVRO_INSTANCE *)router;
Avro *router_inst = (Avro *)router;
json_t* rval = json_object();
@ -1027,44 +862,6 @@ static json_t* diagnostics_json(const MXS_ROUTER *router)
json_object_set_new(rval, "gtid_event_number", json_integer(router_inst->gtid.event_num));
json_object_set_new(rval, "clients", json_integer(router_inst->stats.n_clients));
if (router_inst->clients)
{
json_t* arr = json_array();
spinlock_acquire(&router_inst->lock);
for (AVRO_CLIENT *session = router_inst->clients; session; session = session->next)
{
json_t* client = json_object();
json_object_set_new(client, "uuid", json_string(session->uuid));
json_object_set_new(client, "host", json_string(session->dcb->remote));
json_object_set_new(client, "port", json_integer(dcb_get_port(session->dcb)));
json_object_set_new(client, "user", json_string(session->dcb->user));
json_object_set_new(client, "format", json_string(avro_client_ouput[session->format]));
json_object_set_new(client, "state", json_string(avro_client_states[session->state]));
json_object_set_new(client, "avrofile", json_string(session->avro_binfile));
json_object_set_new(client, "avrofile_last_block",
json_integer(session->avro_file.blocks_read));
json_object_set_new(client, "avrofile_last_record",
json_integer(session->avro_file.records_read));
if (session->gtid_start.domain > 0 || session->gtid_start.server_id > 0 ||
session->gtid_start.seq > 0)
{
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid_start.domain,
session->gtid_start.server_id, session->gtid_start.seq);
json_object_set_new(client, "requested_gtid", json_string(pathbuf));
}
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", session->gtid.domain,
session->gtid.server_id, session->gtid.seq);
json_object_set_new(client, "current_gtid", json_string(pathbuf));
json_array_append_new(arr, client);
}
spinlock_release(&router_inst->lock);
json_object_set_new(rval, "clients", arr);
}
return rval;
}
@ -1120,7 +917,7 @@ static uint64_t getCapabilities(MXS_ROUTER* instance)
*/
bool converter_func(void* data)
{
AVRO_INSTANCE* router = (AVRO_INSTANCE*) data;
Avro* router = (Avro*) data;
bool ok = true;
avro_binlog_end_t binlog_end = AVRO_OK;

View File

@ -15,7 +15,7 @@
* @file avro_client.c - contains code for the AVRO router to client communication
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <stdio.h>
#include <stdlib.h>
@ -37,11 +37,11 @@ extern char *blr_extract_column(GWBUF *buf, int col);
extern uint32_t extract_field(uint8_t *src, int bits);
/* AVRO */
static int avro_client_do_registration(AVRO_INSTANCE *, AVRO_CLIENT *, GWBUF *);
static int avro_client_do_registration(Avro *, AvroSession *, GWBUF *);
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *data);
static void avro_client_process_command(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *queue);
static bool avro_client_stream_data(AVRO_CLIENT *client);
void avro_notify_client(AVRO_CLIENT *client);
static void avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue);
static bool avro_client_stream_data(AvroSession *client);
void avro_notify_client(AvroSession *client);
void poll_fake_write_event(DCB *dcb);
GWBUF* read_avro_json_schema(const char *avrofile, const char* dir);
GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir);
@ -56,7 +56,7 @@ const char* get_avrofile_name(const char *file_ptr, int data_len, char *dest);
* @return 1 on success, 0 on error or failure
*/
int
avro_client_handle_request(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *queue)
avro_client_handle_request(Avro *router, AvroSession *client, GWBUF *queue)
{
int rval = 1;
@ -117,7 +117,7 @@ avro_client_handle_request(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *qu
*
*/
static int
avro_client_do_registration(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *data)
avro_client_do_registration(Avro *router, AvroSession *client, GWBUF *data)
{
const char reg_uuid[] = "REGISTER UUID=";
const int reg_uuid_len = strlen(reg_uuid);
@ -307,7 +307,7 @@ void add_used_tables(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid)
* @param router The AVRO router instance
* @param dcb The dcb to write data
*/
void avro_get_used_tables(AVRO_INSTANCE *router, DCB* dcb)
void avro_get_used_tables(Avro *router, DCB* dcb)
{
sqlite3 *handle = router->sqlite_handle;
char sql[AVRO_SQL_BUFFER_SIZE];
@ -382,7 +382,7 @@ void add_timestamp(sqlite3 *handle, json_t* obj, gtid_pos_t* gtid)
* @param router Router instance
* @param dcb Client DCB
*/
void send_gtid_info(AVRO_INSTANCE *router, gtid_pos_t *gtid_pos, DCB *dcb)
void send_gtid_info(Avro *router, gtid_pos_t *gtid_pos, DCB *dcb)
{
json_t *obj = json_object();
@ -432,7 +432,7 @@ bool file_in_dir(const char *dir, const char *file)
*
*/
static void
avro_client_process_command(AVRO_INSTANCE *router, AVRO_CLIENT *client, GWBUF *queue)
avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue)
{
const char req_data[] = "REQUEST-DATA";
const char req_last_gtid[] = "QUERY-LAST-TRANSACTION";
@ -568,7 +568,7 @@ static int send_row(DCB *dcb, json_t* row)
return rc;
}
static void set_current_gtid(AVRO_CLIENT *client, json_t *row)
static void set_current_gtid(AvroSession *client, json_t *row)
{
json_t *obj = json_object_get(row, avro_sequence);
ss_dassert(json_is_integer(obj));
@ -590,7 +590,7 @@ static void set_current_gtid(AVRO_CLIENT *client, json_t *row)
* @param dcb DCB to stream to
* @return True if more data is readable, false if all data was sent
*/
static bool stream_json(AVRO_CLIENT *client)
static bool stream_json(AvroSession *client)
{
int bytes = 0;
MAXAVRO_FILE *file = client->file_handle;
@ -620,7 +620,7 @@ static bool stream_json(AVRO_CLIENT *client)
* @param dcb DCB to stream to
* @return True if streaming was successful, false if an error occurred
*/
static bool stream_binary(AVRO_CLIENT *client)
static bool stream_binary(AvroSession *client)
{
GWBUF *buffer;
uint64_t bytes = 0;
@ -660,7 +660,7 @@ static int sqlite_cb(void* data, int rows, char** values, char** names)
static const char select_template[] = "SELECT max(position) FROM gtid WHERE domain=%lu "
"AND server_id=%lu AND sequence <= %lu AND avrofile=\"%s\";";
static bool seek_to_index_pos(AVRO_CLIENT *client, MAXAVRO_FILE* file)
static bool seek_to_index_pos(AvroSession *client, MAXAVRO_FILE* file)
{
char *name = strrchr(client->file_handle->filename, '/');
ss_dassert(name);
@ -696,7 +696,7 @@ static bool seek_to_index_pos(AVRO_CLIENT *client, MAXAVRO_FILE* file)
* @param client
* @param file
*/
static bool seek_to_gtid(AVRO_CLIENT *client, MAXAVRO_FILE* file)
static bool seek_to_gtid(AvroSession *client, MAXAVRO_FILE* file)
{
bool seeking = true;
@ -755,10 +755,10 @@ static bool seek_to_gtid(AVRO_CLIENT *client, MAXAVRO_FILE* file)
* @param avro_file The requested AVRO file
* @return True if more data needs to be read
*/
static bool avro_client_stream_data(AVRO_CLIENT *client)
static bool avro_client_stream_data(AvroSession *client)
{
bool read_more = false;
AVRO_INSTANCE *router = client->router;
Avro *router = client->router;
if (strnlen(client->avro_binfile, 1))
{
@ -891,7 +891,7 @@ GWBUF* read_avro_binary_schema(const char *avrofile, const char* dir)
* @param client Avro client session
* @param fullname Absolute path to the file to rotate to
*/
static void rotate_avro_file(AVRO_CLIENT *client, char *fullname)
static void rotate_avro_file(AvroSession *client, char *fullname)
{
char *filename = strrchr(fullname, '/') + 1;
size_t len = strlen(filename);
@ -965,7 +965,7 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
{
if (reason == DCB_REASON_DRAINED)
{
AVRO_CLIENT *client = (AVRO_CLIENT*)userdata;
AvroSession *client = (AvroSession*)userdata;
spinlock_acquire(&client->catch_lock);
if (client->cstate & AVRO_CS_BUSY)
@ -1043,7 +1043,7 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata)
*
* @param client Client to notify
*/
void avro_notify_client(AVRO_CLIENT *client)
void avro_notify_client(AvroSession *client)
{
/* Add fake event that will call the avro_client_callback() routine */
poll_fake_write_event(client->dcb);

View File

@ -22,7 +22,7 @@
* uses.
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <maxscale/query_classifier.h>
#include <binlog_common.h>
@ -39,12 +39,12 @@
static const char *statefile_section = "avro-conversion";
static const char *ddl_list_name = "table-ddl.list";
void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr,
void handle_query_event(Avro *router, REP_HEADER *hdr,
int *pending_transaction, uint8_t *ptr);
bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len);
void avro_notify_client(AVRO_CLIENT *client);
void avro_update_index(AVRO_INSTANCE* router);
void update_used_tables(AVRO_INSTANCE* router);
bool is_create_table_statement(Avro *router, char* ptr, size_t len);
void avro_notify_client(AvroSession *client);
void avro_update_index(Avro* router);
void update_used_tables(Avro* router);
TABLE_CREATE* table_create_from_schema(const char* file, const char* db,
const char* table, int version);
@ -158,7 +158,7 @@ AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema, cons
* @return True if the file was written successfully to disk
*
*/
bool avro_save_conversion_state(AVRO_INSTANCE *router)
bool avro_save_conversion_state(Avro *router)
{
FILE *config_file;
char filename[PATH_MAX + 1];
@ -208,7 +208,7 @@ bool avro_save_conversion_state(AVRO_INSTANCE *router)
*/
static int conv_state_handler(void* data, const char* section, const char* key, const char* value)
{
AVRO_INSTANCE *router = (AVRO_INSTANCE*) data;
Avro *router = (Avro*) data;
if (strcmp(section, statefile_section) == 0)
{
@ -261,7 +261,7 @@ static int conv_state_handler(void* data, const char* section, const char* key,
* @param router Avro router instance
* @return True if the stored state was loaded successfully
*/
bool avro_load_conversion_state(AVRO_INSTANCE *router)
bool avro_load_conversion_state(Avro *router)
{
char filename[PATH_MAX + 1];
bool rval = false;
@ -331,7 +331,7 @@ void avro_table_free(AVRO_TABLE *table)
* @return AVRO_OK if the next file exists, AVRO_LAST_FILE if this is the last
* available file.
*/
static avro_binlog_end_t rotate_to_next_file_if_exists(AVRO_INSTANCE* router, uint64_t pos, bool stop_seen)
static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t pos, bool stop_seen)
{
avro_binlog_end_t rval = AVRO_LAST_FILE;
@ -386,7 +386,7 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(AVRO_INSTANCE* router, ui
* @param pos Current position, only used for logging
* @param next_binlog The next file to rotate to
*/
static void rotate_to_file(AVRO_INSTANCE* router, uint64_t pos, const char *next_binlog)
static void rotate_to_file(Avro* router, uint64_t pos, const char *next_binlog)
{
/** Binlog file is processed, prepare for next one */
MXS_NOTICE("End of binlog file [%s] at %lu. Rotating to file [%s].",
@ -404,7 +404,7 @@ static void rotate_to_file(AVRO_INSTANCE* router, uint64_t pos, const char *next
* @param pos Starting position of the event header
* @return The event data or NULL if an error occurred
*/
static GWBUF* read_event_data(AVRO_INSTANCE *router, REP_HEADER* hdr, uint64_t pos)
static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos)
{
GWBUF* result;
/* Allocate a GWBUF for the event */
@ -446,11 +446,13 @@ static GWBUF* read_event_data(AVRO_INSTANCE *router, REP_HEADER* hdr, uint64_t p
return result;
}
void notify_all_clients(AVRO_INSTANCE *router)
void notify_all_clients(Avro *router)
{
AVRO_CLIENT *client = router->clients;
AvroSession *client = router->clients;
int notified = 0;
/* TODO: Use dcb_foreach or some similar mechanism for this
while (client)
{
spinlock_acquire(&client->catch_lock);
@ -463,6 +465,7 @@ void notify_all_clients(AVRO_INSTANCE *router)
client = client->next;
}
*/
if (notified > 0)
{
@ -470,7 +473,7 @@ void notify_all_clients(AVRO_INSTANCE *router)
}
}
void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_commits)
void do_checkpoint(Avro *router, uint64_t *total_rows, uint64_t *total_commits)
{
update_used_tables(router);
avro_flush_all_tables(router, AVROROUTER_FLUSH);
@ -492,7 +495,7 @@ void do_checkpoint(AVRO_INSTANCE *router, uint64_t *total_rows, uint64_t *total_
* @return How the binlog was closed
* @see enum avro_binlog_end
*/
avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
avro_binlog_end_t avro_read_all_events(Avro *router)
{
uint8_t hdbuf[BINLOG_EVENT_HDR_LEN];
unsigned long long pos = router->current_pos;
@ -785,7 +788,7 @@ avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router)
*
* @param router Router instance
*/
void avro_load_metadata_from_schemas(AVRO_INSTANCE *router)
void avro_load_metadata_from_schemas(Avro *router)
{
char path[PATH_MAX + 1];
snprintf(path, sizeof(path), "%s/*.avsc", router->avrodir);
@ -852,7 +855,7 @@ void avro_load_metadata_from_schemas(AVRO_INSTANCE *router)
* @brief Flush all Avro records to disk
* @param router Avro router instance
*/
void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush)
{
HASHITERATOR *iter = hashtable_iterator(router->open_tables);
@ -887,7 +890,7 @@ void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush)
* @param len Statement length
* @return True if the statement creates a new table
*/
bool is_create_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
bool is_create_table_statement(Avro *router, char* ptr, size_t len)
{
int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->create_table_re, NULL);
@ -939,7 +942,7 @@ bool is_create_as_statement(const char* ptr, size_t len)
* @param len Statement length
* @return True if the statement alters a table
*/
bool is_alter_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
bool is_alter_table_statement(Avro *router, char* ptr, size_t len)
{
int rc = 0;
pcre2_match_data *mdata = pcre2_match_data_create_from_pattern(router->alter_table_re, NULL);
@ -969,12 +972,11 @@ bool is_alter_table_statement(AVRO_INSTANCE *router, char* ptr, size_t len)
* @param created Created table
* @return False if an error occurred and true if successful
*/
bool save_and_replace_table_create(AVRO_INSTANCE *router, TABLE_CREATE *created)
bool save_and_replace_table_create(Avro *router, TABLE_CREATE *created)
{
char table_ident[MYSQL_TABLE_MAXLEN + MYSQL_DATABASE_MAXLEN + 2];
snprintf(table_ident, sizeof(table_ident), "%s.%s", created->database, created->table);
spinlock_acquire(&router->lock); // Is this necessary?
TABLE_CREATE *old = static_cast<TABLE_CREATE*>(hashtable_fetch(router->created_tables, table_ident));
if (old)
@ -999,7 +1001,6 @@ bool save_and_replace_table_create(AVRO_INSTANCE *router, TABLE_CREATE *created)
hashtable_add(router->created_tables, table_ident, created);
ss_dassert(created->columns > 0);
spinlock_release(&router->lock);
return true;
}
@ -1056,7 +1057,7 @@ static void strip_executable_comments(char *sql, int* len)
* @param pending_transaction Pointer where status of pending transaction is stored
* @param ptr Pointer to the start of the event payload
*/
void handle_query_event(AVRO_INSTANCE *router, REP_HEADER *hdr, int *pending_transaction, uint8_t *ptr)
void handle_query_event(Avro *router, REP_HEADER *hdr, int *pending_transaction, uint8_t *ptr)
{
int dblen = ptr[DBNM_OFF];
int vblklen = ptr[VBLK_OFF];

View File

@ -24,7 +24,7 @@
* The index is stored as an SQLite3 database.
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <maxscale/debug.h>
#include <glob.h>
@ -62,7 +62,7 @@ int index_query_cb(void *data, int rows, char** values, char** names)
return 0;
}
void avro_index_file(AVRO_INSTANCE *router, const char* filename)
void avro_index_file(Avro *router, const char* filename)
{
MAXAVRO_FILE *file = maxavro_file_open(filename);
@ -178,7 +178,7 @@ void avro_index_file(AVRO_INSTANCE *router, const char* filename)
* manner.
* @param data The router instance
*/
void avro_update_index(AVRO_INSTANCE* router)
void avro_update_index(Avro* router)
{
char path[PATH_MAX + 1];
snprintf(path, sizeof(path), "%s/*.avro", router->avrodir);
@ -209,7 +209,7 @@ static const char *insert_sql = "INSERT OR IGNORE INTO " MEMORY_TABLE_NAME
* @param router Avro router instance
* @param table Table to add
*/
void add_used_table(AVRO_INSTANCE* router, const char* table)
void add_used_table(Avro* router, const char* table)
{
char sql[AVRO_SQL_BUFFER_SIZE], *errmsg;
snprintf(sql, sizeof(sql), insert_sql, router->gtid.domain, router->gtid.server_id,
@ -232,7 +232,7 @@ void add_used_table(AVRO_INSTANCE* router, const char* table)
*
* @param router Avro router instance
*/
void update_used_tables(AVRO_INSTANCE* router)
void update_used_tables(Avro* router)
{
char *errmsg;

View File

@ -11,7 +11,7 @@
* Public License.
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <maxscale/mysql_utils.h>
#include <jansson.h>
@ -33,8 +33,8 @@ static bool warn_large_enumset = false; /**< Remove when support for ENUM/SET va
uint8_t* process_row_event_data(TABLE_MAP *map, TABLE_CREATE *create,
avro_value_t *record, uint8_t *ptr,
uint8_t *columns_present, uint8_t *end);
void notify_all_clients(AVRO_INSTANCE *router);
void add_used_table(AVRO_INSTANCE* router, const char* table);
void notify_all_clients(Avro *router);
void add_used_table(Avro* router, const char* table);
/**
* @brief Get row event name
@ -92,7 +92,7 @@ static const char* codec_to_string(enum mxs_avro_codec_type type)
* @param hdr Replication header
* @param ptr Pointer to event payload
*/
bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
{
bool rval = false;
uint64_t id;
@ -190,7 +190,7 @@ bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr
* @param event_type Event type
* @param record Record to prepare
*/
static void prepare_record(AVRO_INSTANCE *router, REP_HEADER *hdr,
static void prepare_record(Avro *router, REP_HEADER *hdr,
int event_type, avro_value_t *record)
{
avro_value_t field;
@ -225,7 +225,7 @@ static void prepare_record(AVRO_INSTANCE *router, REP_HEADER *hdr,
* @param ptr Pointer to the start of the event
* @return True on succcess, false on error
*/
bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr)
bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr)
{
bool rval = false;
uint8_t *start = ptr;

View File

@ -15,7 +15,7 @@
* @file avro_schema.c - Avro schema related functions
*/
#include "avrorouter.h"
#include "avrorouter.hh"
#include <maxscale/mysql_utils.h>
#include <jansson.h>
@ -972,7 +972,7 @@ static bool extract_create_like_identifier(const char* sql, size_t len, char* ta
/**
* Create a table from another table
*/
TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db)
TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db)
{
TABLE_CREATE* rval = NULL;
char target[MYSQL_TABLE_MAXLEN + 1] = "";

View File

@ -219,11 +219,13 @@ typedef struct gtid_pos
* rebuild GTID events in the correct order. */
} gtid_pos_t;
struct Avro;
/**
* The client structure used within this router.
* This represents the clients that are requesting AVRO files from MaxScale.
*/
typedef struct avro_client
struct AvroSession
{
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_top;
@ -234,8 +236,7 @@ typedef struct avro_client
char *uuid; /*< Client UUID */
SPINLOCK catch_lock; /*< Event catchup lock */
SPINLOCK file_lock; /*< Protects rses_deleted */
struct avro_instance *router; /*< Pointer to the owning router */
struct avro_client *next;
Avro* router; /*< Pointer to the owning router */
MAXAVRO_FILE *file_handle; /*< Current open file handle */
uint64_t last_sent_pos; /*< The last record we sent */
AVRO_CLIENT_STATS stats; /*< Slave statistics */
@ -250,16 +251,14 @@ typedef struct avro_client
#if defined(SS_DEBUG)
skygw_chk_t rses_chk_tail;
#endif
} AVRO_CLIENT;
};
/**
* * The per instance data for the AVRO router.
* */
typedef struct avro_instance
struct Avro
{
SERVICE *service; /*< Pointer to the service using this router */
AVRO_CLIENT *clients; /*< Link list of all the CDC client connections */
SPINLOCK lock; /*< Spinlock for the instance data */
int initbinlog; /*< Initial binlog file number */
char *fileroot; /*< Root of binlog filename */
unsigned int state; /*< State of the AVRO router */
@ -299,30 +298,30 @@ typedef struct avro_instance
uint64_t block_size; /**< Avro datablock size */
enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */
struct avro_instance *next;
} AVRO_INSTANCE;
};
extern void read_table_info(uint8_t *ptr, uint8_t post_header_len, uint64_t *table_id,
char* dest, size_t len);
extern TABLE_MAP *table_map_alloc(uint8_t *ptr, uint8_t hdr_len, TABLE_CREATE* create);
extern void table_map_free(TABLE_MAP *map);
extern TABLE_CREATE* table_create_alloc(char* ident, const char* sql, int len);
extern TABLE_CREATE* table_create_copy(AVRO_INSTANCE *router, const char* sql, size_t len, const char* db);
extern TABLE_CREATE* table_create_copy(Avro *router, const char* sql, size_t len, const char* db);
extern void table_create_free(TABLE_CREATE* value);
extern bool table_create_save(TABLE_CREATE *create, const char *filename);
extern bool table_create_alter(TABLE_CREATE *create, const char *sql, const char *end);
extern void read_table_identifier(const char* db, const char *sql, const char *end, char *dest, int size);
extern int avro_client_handle_request(AVRO_INSTANCE *, AVRO_CLIENT *, GWBUF *);
extern void avro_client_rotate(AVRO_INSTANCE *router, AVRO_CLIENT *client, uint8_t *ptr);
extern int avro_client_handle_request(Avro *, AvroSession *, GWBUF *);
extern void avro_client_rotate(Avro *router, AvroSession *client, uint8_t *ptr);
extern bool avro_open_binlog(const char *binlogdir, const char *file, int *fd);
extern void avro_close_binlog(int fd);
extern avro_binlog_end_t avro_read_all_events(AVRO_INSTANCE *router);
extern avro_binlog_end_t avro_read_all_events(Avro *router);
extern AVRO_TABLE* avro_table_alloc(const char* filepath, const char* json_schema,
const char *codec, size_t block_size);
extern void avro_table_free(AVRO_TABLE *table);
extern char* json_new_schema_from_table(TABLE_MAP *map);
extern void save_avro_schema(const char *path, const char* schema, TABLE_MAP *map);
extern bool handle_table_map_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_row_event(AVRO_INSTANCE *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
extern bool handle_row_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr);
enum avrorouter_file_op
{
@ -336,7 +335,7 @@ enum avrorouter_file_op
* @param router Router instance
* @param flush AVROROUTER_SYNC for sync only or AVROROUTER_FLUSH for full flush
*/
extern void avro_flush_all_tables(AVRO_INSTANCE *router, enum avrorouter_file_op flush);
extern void avro_flush_all_tables(Avro *router, enum avrorouter_file_op flush);
#define AVRO_CLIENT_UNREGISTERED 0x0000
#define AVRO_CLIENT_REGISTERED 0x0001