Move Avro creation into Avro::create

The Avro instance is now created inside a static class method. This brings
it in line with how other modules create instances.

Converted all strings to std::string and updated their usage.
This commit is contained in:
Markus Mäkelä
2018-05-23 07:35:43 +03:00
parent 2a76a36505
commit ec919c367b
6 changed files with 183 additions and 319 deletions

View File

@ -53,8 +53,6 @@ using namespace maxscale;
#define BINLOG_NAMEFMT "%s.%06d"
#endif
#define AVRO_TASK_DELAY_MAX 15
/** For detection of CREATE/ALTER TABLE statements */
static const char* create_table_regex =
"(?i)create[a-z0-9[:space:]_]+table";
@ -82,7 +80,7 @@ int blr_file_get_next_binlogname(const char *router);
bool avro_load_conversion_state(Avro *router);
void avro_load_metadata_from_schemas(Avro *router);
int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata);
static bool ensure_dir_ok(const char* path, int mode);
static bool ensure_dir_ok(std::string path, int mode);
bool avro_save_conversion_state(Avro *router);
static void stats_func(void *);
void avro_index_file(Avro *router, const char* path);
@ -180,10 +178,10 @@ static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output)
conversion_task_ctl(inst, false);
// Then delete the files
return do_unlink("%s/%s", inst->avrodir, AVRO_PROGRESS_FILE) && // State file
do_unlink("/%s/%s", inst->avrodir, avro_index_name) && // Index database
do_unlink_with_pattern("/%s/*.avro", inst->avrodir) && // .avro files
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir); // .avsc files
return do_unlink("%s/%s", inst->avrodir.c_str(), AVRO_PROGRESS_FILE) && // State file
do_unlink("/%s/%s", inst->avrodir.c_str(), avro_index_name) && // Index database
do_unlink_with_pattern("/%s/*.avro", inst->avrodir.c_str()) && // .avro files
do_unlink_with_pattern("/%s/*.avsc", inst->avrodir.c_str()); // .avsc files
}
/**
@ -372,7 +370,7 @@ static bool conversion_task_ctl(Avro *inst, bool start)
if (start)
{
inst->task_handle = worker->delayed_call(inst->task_delay * 1000, converter_func, inst);
inst->task_handle = worker->delayed_call(1000, converter_func, inst);
}
rval = true;
@ -392,20 +390,20 @@ static bool conversion_task_ctl(Avro *inst, bool start)
* @param inst Avro router instance
* @param options The @c router_options of a binlogrouter instance
*/
void read_source_service_options(Avro *inst, const char** options,
MXS_CONFIG_PARAMETER* params)
void Avro::read_source_service_options(SERVICE* source)
{
char** options = source->routerOptions;
MXS_CONFIG_PARAMETER* params = source->svc_config_param;
for (MXS_CONFIG_PARAMETER* p = params; p; p = p->next)
{
if (strcmp(p->name, "binlogdir") == 0)
{
MXS_FREE(inst->binlogdir);
inst->binlogdir = MXS_STRDUP_A(p->value);
binlogdir = p->value;
}
else if (strcmp(p->name, "filestem") == 0)
{
MXS_FREE(inst->fileroot);
inst->fileroot = MXS_STRDUP_A(p->value);
filestem = p->value;
}
}
@ -424,13 +422,11 @@ void read_source_service_options(Avro *inst, const char** options,
if (strcmp(option, "binlogdir") == 0)
{
MXS_FREE(inst->binlogdir);
inst->binlogdir = MXS_STRDUP_A(value);
binlogdir = value;
}
else if (strcmp(option, "filestem") == 0)
{
MXS_FREE(inst->fileroot);
inst->fileroot = MXS_STRDUP_A(value);
filestem = value;
}
}
}
@ -450,45 +446,17 @@ void read_source_service_options(Avro *inst, const char** options,
*
* @return The instance data for this new instance
*/
static MXS_ROUTER *
createInstance(SERVICE *service, char **options)
MXS_ROUTER* createInstance(SERVICE *service, char **options)
{
Avro *inst = new (std::nothrow) Avro;
return Avro::create(service);
}
if (inst == NULL)
{
return NULL;
}
memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS));
inst->service = service;
inst->binlog_fd = -1;
inst->current_pos = 4;
inst->binlog_position = 4;
inst->lastEventTimestamp = 0;
inst->binlog_position = 0;
inst->task_delay = 1;
inst->row_count = 0;
inst->trx_count = 0;
inst->binlogdir = NULL;
MXS_CONFIG_PARAMETER *params = service->svc_config_param;
inst->avrodir = MXS_STRDUP_A(config_get_string(params, "avrodir"));
inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem"));
inst->row_target = config_get_integer(params, "group_rows");
inst->trx_target = config_get_integer(params, "group_trx");
inst->codec = static_cast<mxs_avro_codec_type>(config_get_enum(params, "codec", codec_values));
int first_file = config_get_integer(params, "start_index");
inst->block_size = config_get_size(params, "block_size");
MXS_CONFIG_PARAMETER *param = config_get_param(params, "source");
inst->gtid.domain = 0;
inst->gtid.event_num = 0;
inst->gtid.seq = 0;
inst->gtid.server_id = 0;
inst->gtid.timestamp = 0;
//static
Avro* Avro::create(SERVICE* service)
{
bool err = false;
SERVICE* source_service = NULL;
MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source");
if (param)
{
@ -499,109 +467,97 @@ createInstance(SERVICE *service, char **options)
{
if (strcmp(source->routerModule, "binlogrouter") == 0)
{
MXS_NOTICE("[%s] Using configuration options from service '%s'.",
service->name, source->name);
read_source_service_options(inst, (const char**)source->routerOptions,
source->svc_config_param);
MXS_INFO("Using configuration options from service '%s'.", source->name);
source_service = source;
}
else
{
MXS_ERROR("[%s] Service '%s' uses router module '%s' instead of"
" 'binlogrouter'.", service->name, source->name,
source->routerModule);
MXS_ERROR("Service '%s' uses router module '%s' instead of "
"'binlogrouter'.", source->name, source->routerModule);
err = true;
}
}
else
{
MXS_ERROR("Service '%s' not found.", param->value);
err = true;
}
}
param = config_get_param(params, "binlogdir");
if (param)
{
MXS_FREE(inst->binlogdir);
inst->binlogdir = MXS_STRDUP_A(param->value);
}
if (inst->binlogdir == NULL)
{
MXS_ERROR("No 'binlogdir' option found in source service, in parameters or in router_options.");
err = true;
}
else if (inst->fileroot == NULL)
{
MXS_ERROR("No 'filestem' option found in source service, in parameters or in router_options.");
err = true;
}
else if (ensure_dir_ok(inst->binlogdir, R_OK) && ensure_dir_ok(inst->avrodir, W_OK))
{
snprintf(inst->binlog_name, sizeof(inst->binlog_name), BINLOG_NAMEFMT, inst->fileroot, first_file);
inst->prevbinlog[0] = '\0';
MXS_NOTICE("[%s] Reading MySQL binlog files from %s", service->name, inst->binlogdir);
MXS_NOTICE("[%s] Avro files stored at: %s", service->name, inst->avrodir);
MXS_NOTICE("[%s] First binlog is: %s", service->name, inst->binlog_name);
}
int pcreerr;
size_t erroff;
pcre2_code *create_re = pcre2_compile((PCRE2_SPTR) create_table_regex,
PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL);
ss_dassert(create_re); // This should almost never fail
pcre2_code *alter_re = pcre2_compile((PCRE2_SPTR) alter_table_regex,
PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL);
ss_dassert(alter_re); // This should almost never fail
if (create_re && alter_re)
{
inst->create_table_re = create_re;
inst->alter_table_re = alter_re;
}
else
{
err = true;
}
const int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE;
sqlite3* sqlite_handle;
const char* avrodir = config_get_string(service->svc_config_param, "avrodir");
char dbpath[PATH_MAX + 1];
snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name);
snprintf(dbpath, sizeof(dbpath), "/%s/%s", avrodir, avro_index_name);
if (access(dbpath, W_OK) == 0)
if (sqlite3_open_v2(dbpath, &sqlite_handle, flags, NULL) != SQLITE_OK)
{
MXS_NOTICE("[%s] Using existing GTID index: %s", service->name, dbpath);
}
if (sqlite3_open_v2(dbpath, &inst->sqlite_handle,
SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK)
{
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath,
sqlite3_errmsg(inst->sqlite_handle));
MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(sqlite_handle));
err = true;
}
else if (!create_tables(inst->sqlite_handle))
else if (!create_tables(sqlite_handle))
{
err = true;
}
if (err)
{
sqlite3_close_v2(inst->sqlite_handle);
MXS_FREE(inst->avrodir);
MXS_FREE(inst->binlogdir);
MXS_FREE(inst->fileroot);
delete inst;
sqlite3_close_v2(sqlite_handle);
return NULL;
}
/* AVRO converter init */
avro_load_conversion_state(inst);
avro_load_metadata_from_schemas(inst);
return new (std::nothrow) Avro(service, service->svc_config_param,
sqlite_handle, source_service);
}
Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERVICE* source):
service(service),
filestem(config_get_string(params, "filestem")),
binlogdir(config_get_string(params, "binlogdir")),
avrodir(config_get_string(params, "avrodir")),
current_pos(4),
binlog_fd(-1),
trx_count(0),
trx_target(config_get_integer(params, "group_trx")),
row_count(0),
row_target(config_get_integer(params, "group_rows")),
block_size(config_get_size(params, "block_size")),
codec(static_cast<mxs_avro_codec_type>(config_get_enum(params, "codec", codec_values))),
sqlite_handle(handle),
task_handle(0),
stats{0}
{
int pcreerr;
size_t erroff;
create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
ss_dassert(create_table_re); // This should never fail
alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED,
0, &pcreerr, &erroff, NULL);
ss_dassert(alter_table_re); // This should never fail
if (source)
{
read_source_service_options(source);
}
char filename[BINLOG_FNAMELEN + 1];
snprintf(filename, sizeof(filename), BINLOG_NAMEFMT, filestem.c_str(),
config_get_integer(params, "start_index"));
binlog_name = filename;
MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str());
MXS_NOTICE("Avro files stored at: %s", avrodir.c_str());
MXS_NOTICE("First binlog is: %s", binlog_name.c_str());
// TODO: Do these in Avro::create
avro_load_conversion_state(this);
avro_load_metadata_from_schemas(this);
/* Start the scan, read, convert AVRO task */
conversion_task_ctl(inst, true);
conversion_task_ctl(this, true);
MXS_INFO("current MySQL binlog file is %s, pos is %lu\n",
inst->binlog_name, inst->current_pos);
return (MXS_ROUTER *) inst;
}
/**
@ -683,15 +639,13 @@ diagnostics(MXS_ROUTER *router, DCB *dcb)
{
Avro *router_inst = (Avro *) router;
dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n",
router_inst->avrodir, AVRO_PROGRESS_FILE);
dcb_printf(dcb, "\tAVRO files directory: %s\n",
router_inst->avrodir);
router_inst->avrodir.c_str());
dcb_printf(dcb, "\tBinlog directory: %s\n",
router_inst->binlogdir);
router_inst->binlogdir.c_str());
dcb_printf(dcb, "\tCurrent binlog file: %s\n",
router_inst->binlog_name);
router_inst->binlog_name.c_str());
dcb_printf(dcb, "\tCurrent binlog position: %lu\n",
router_inst->current_pos);
dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n",
@ -723,12 +677,12 @@ static json_t* diagnostics_json(const MXS_ROUTER *router)
json_t* rval = json_object();
char pathbuf[PATH_MAX + 1];
snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir, AVRO_PROGRESS_FILE);
snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir.c_str(), AVRO_PROGRESS_FILE);
json_object_set_new(rval, "infofile", json_string(pathbuf));
json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir));
json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir));
json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name));
json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir.c_str()));
json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir.c_str()));
json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name.c_str()));
json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos));
snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain,
@ -798,105 +752,43 @@ bool converter_func(Worker::Call::action_t action, Avro* router)
return false;
}
bool ok = true;
avro_binlog_end_t binlog_end = AVRO_OK;
bool progress = false;
avro_binlog_end_t binlog_end = AVRO_BINLOG_ERROR;
while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK)
uint64_t start_pos = router->current_pos;
std::string binlog_name = router->binlog_name;
if (avro_open_binlog(router->binlogdir.c_str(), router->binlog_name.c_str(), &router->binlog_fd))
{
uint64_t start_pos = router->current_pos;
char binlog_name[BINLOG_FNAMELEN + 1];
strcpy(binlog_name, router->binlog_name);
binlog_end = avro_read_all_events(router);
if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd))
if (router->current_pos != start_pos || binlog_name != router->binlog_name)
{
binlog_end = avro_read_all_events(router);
if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0)
{
/** We processed some data, reset the conversion task delay */
router->task_delay = 1;
/** Update the GTID index */
avro_update_index(router);
}
avro_close_binlog(router->binlog_fd);
}
else
{
binlog_end = AVRO_BINLOG_ERROR;
/** Update the GTID index */
avro_update_index(router);
progress = true;
}
avro_close_binlog(router->binlog_fd);
}
static int logged = true;
/** We reached end of file, flush unwritten records to disk */
if (router->task_delay == 1)
if (progress)
{
avro_flush_all_tables(router, AVROROUTER_FLUSH);
avro_save_conversion_state(router);
logged = false;
}
if (binlog_end == AVRO_LAST_FILE)
if (binlog_end == AVRO_LAST_FILE && !logged)
{
router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX);
logged = true;
MXS_INFO("Stopped processing file %s at position %lu. Waiting until"
" more data is written before continuing. Next check in %d seconds.",
router->binlog_name, router->current_pos, router->task_delay);
" more data is written before continuing.",
router->binlog_name.c_str(), router->current_pos);
}
return true;
}
/**
* @brief Ensure directory exists and is writable
*
* TODO: Move this as a function in the core
*
* @param path Path to directory
* @param mode One of O_RDONLY, O_WRONLY or O_RDWR
* @return True if directory exists and can be opened with @p mode permission
*/
static bool ensure_dir_ok(const char* path, int mode)
{
bool rval = false;
if (path)
{
char resolved[PATH_MAX + 1];
const char *rp = realpath(path, resolved);
if (rp == NULL && errno == ENOENT)
{
rp = path;
}
if (rp)
{
/** Make sure the directory exists */
if (mkdir(rp, 0774) == 0 || errno == EEXIST)
{
if (access(rp, mode) == 0)
{
rval = true;
}
else
{
MXS_ERROR("Failed to access directory '%s': %d, %s", rp,
errno, mxs_strerror(errno));
}
}
else
{
MXS_ERROR("Failed to create directory '%s': %d, %s", rp,
errno, mxs_strerror(errno));
}
}
else
{
MXS_ERROR("Failed to resolve real path name for '%s': %d, %s", path,
errno, mxs_strerror(errno));
}
}
return rval;
}