diff --git a/server/modules/routing/avrorouter/avro.cc b/server/modules/routing/avrorouter/avro.cc index 937af475c..93d1a4d71 100644 --- a/server/modules/routing/avrorouter/avro.cc +++ b/server/modules/routing/avrorouter/avro.cc @@ -53,8 +53,6 @@ using namespace maxscale; #define BINLOG_NAMEFMT "%s.%06d" #endif -#define AVRO_TASK_DELAY_MAX 15 - /** For detection of CREATE/ALTER TABLE statements */ static const char* create_table_regex = "(?i)create[a-z0-9[:space:]_]+table"; @@ -82,7 +80,7 @@ int blr_file_get_next_binlogname(const char *router); bool avro_load_conversion_state(Avro *router); void avro_load_metadata_from_schemas(Avro *router); int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata); -static bool ensure_dir_ok(const char* path, int mode); +static bool ensure_dir_ok(std::string path, int mode); bool avro_save_conversion_state(Avro *router); static void stats_func(void *); void avro_index_file(Avro *router, const char* path); @@ -180,10 +178,10 @@ static bool avro_handle_purge(const MODULECMD_ARG *args, json_t** output) conversion_task_ctl(inst, false); // Then delete the files - return do_unlink("%s/%s", inst->avrodir, AVRO_PROGRESS_FILE) && // State file - do_unlink("/%s/%s", inst->avrodir, avro_index_name) && // Index database - do_unlink_with_pattern("/%s/*.avro", inst->avrodir) && // .avro files - do_unlink_with_pattern("/%s/*.avsc", inst->avrodir); // .avsc files + return do_unlink("%s/%s", inst->avrodir.c_str(), AVRO_PROGRESS_FILE) && // State file + do_unlink("/%s/%s", inst->avrodir.c_str(), avro_index_name) && // Index database + do_unlink_with_pattern("/%s/*.avro", inst->avrodir.c_str()) && // .avro files + do_unlink_with_pattern("/%s/*.avsc", inst->avrodir.c_str()); // .avsc files } /** @@ -372,7 +370,7 @@ static bool conversion_task_ctl(Avro *inst, bool start) if (start) { - inst->task_handle = worker->delayed_call(inst->task_delay * 1000, converter_func, inst); + inst->task_handle = worker->delayed_call(1000, converter_func, inst); } rval = true; @@ -392,20 +390,20 @@ static bool conversion_task_ctl(Avro *inst, bool start) * @param inst Avro router instance * @param options The @c router_options of a binlogrouter instance */ -void read_source_service_options(Avro *inst, const char** options, - MXS_CONFIG_PARAMETER* params) +void Avro::read_source_service_options(SERVICE* source) { + char** options = source->routerOptions; + MXS_CONFIG_PARAMETER* params = source->svc_config_param; + for (MXS_CONFIG_PARAMETER* p = params; p; p = p->next) { if (strcmp(p->name, "binlogdir") == 0) { - MXS_FREE(inst->binlogdir); - inst->binlogdir = MXS_STRDUP_A(p->value); + binlogdir = p->value; } else if (strcmp(p->name, "filestem") == 0) { - MXS_FREE(inst->fileroot); - inst->fileroot = MXS_STRDUP_A(p->value); + filestem = p->value; } } @@ -424,13 +422,11 @@ void read_source_service_options(Avro *inst, const char** options, if (strcmp(option, "binlogdir") == 0) { - MXS_FREE(inst->binlogdir); - inst->binlogdir = MXS_STRDUP_A(value); + binlogdir = value; } else if (strcmp(option, "filestem") == 0) { - MXS_FREE(inst->fileroot); - inst->fileroot = MXS_STRDUP_A(value); + filestem = value; } } } @@ -450,45 +446,17 @@ void read_source_service_options(Avro *inst, const char** options, * * @return The instance data for this new instance */ -static MXS_ROUTER * -createInstance(SERVICE *service, char **options) +MXS_ROUTER* createInstance(SERVICE *service, char **options) { - Avro *inst = new (std::nothrow) Avro; + return Avro::create(service); +} - if (inst == NULL) - { - return NULL; - } - - memset(&inst->stats, 0, sizeof(AVRO_ROUTER_STATS)); - inst->service = service; - inst->binlog_fd = -1; - inst->current_pos = 4; - inst->binlog_position = 4; - inst->lastEventTimestamp = 0; - inst->binlog_position = 0; - inst->task_delay = 1; - inst->row_count = 0; - inst->trx_count = 0; - inst->binlogdir = NULL; - - MXS_CONFIG_PARAMETER *params = service->svc_config_param; - - inst->avrodir = MXS_STRDUP_A(config_get_string(params, "avrodir")); - inst->fileroot = MXS_STRDUP_A(config_get_string(params, "filestem")); - inst->row_target = config_get_integer(params, "group_rows"); - inst->trx_target = config_get_integer(params, "group_trx"); - inst->codec = static_cast(config_get_enum(params, "codec", codec_values)); - int first_file = config_get_integer(params, "start_index"); - inst->block_size = config_get_size(params, "block_size"); - - MXS_CONFIG_PARAMETER *param = config_get_param(params, "source"); - inst->gtid.domain = 0; - inst->gtid.event_num = 0; - inst->gtid.seq = 0; - inst->gtid.server_id = 0; - inst->gtid.timestamp = 0; +//static +Avro* Avro::create(SERVICE* service) +{ bool err = false; + SERVICE* source_service = NULL; + MXS_CONFIG_PARAMETER *param = config_get_param(service->svc_config_param, "source"); if (param) { @@ -499,109 +467,97 @@ createInstance(SERVICE *service, char **options) { if (strcmp(source->routerModule, "binlogrouter") == 0) { - MXS_NOTICE("[%s] Using configuration options from service '%s'.", - service->name, source->name); - read_source_service_options(inst, (const char**)source->routerOptions, - source->svc_config_param); + MXS_INFO("Using configuration options from service '%s'.", source->name); + source_service = source; } else { - MXS_ERROR("[%s] Service '%s' uses router module '%s' instead of" - " 'binlogrouter'.", service->name, source->name, - source->routerModule); + MXS_ERROR("Service '%s' uses router module '%s' instead of " + "'binlogrouter'.", source->name, source->routerModule); err = true; } } + else + { + MXS_ERROR("Service '%s' not found.", param->value); + err = true; + } } - param = config_get_param(params, "binlogdir"); - - if (param) - { - MXS_FREE(inst->binlogdir); - inst->binlogdir = MXS_STRDUP_A(param->value); - } - - if (inst->binlogdir == NULL) - { - MXS_ERROR("No 'binlogdir' option found in source service, in parameters or in router_options."); - err = true; - } - else if (inst->fileroot == NULL) - { - MXS_ERROR("No 'filestem' option found in source service, in parameters or in router_options."); - err = true; - } - else if (ensure_dir_ok(inst->binlogdir, R_OK) && ensure_dir_ok(inst->avrodir, W_OK)) - { - snprintf(inst->binlog_name, sizeof(inst->binlog_name), BINLOG_NAMEFMT, inst->fileroot, first_file); - inst->prevbinlog[0] = '\0'; - - MXS_NOTICE("[%s] Reading MySQL binlog files from %s", service->name, inst->binlogdir); - MXS_NOTICE("[%s] Avro files stored at: %s", service->name, inst->avrodir); - MXS_NOTICE("[%s] First binlog is: %s", service->name, inst->binlog_name); - } - - int pcreerr; - size_t erroff; - pcre2_code *create_re = pcre2_compile((PCRE2_SPTR) create_table_regex, - PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL); - ss_dassert(create_re); // This should almost never fail - pcre2_code *alter_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, - PCRE2_ZERO_TERMINATED, 0, &pcreerr, &erroff, NULL); - ss_dassert(alter_re); // This should almost never fail - - if (create_re && alter_re) - { - inst->create_table_re = create_re; - inst->alter_table_re = alter_re; - } - else - { - err = true; - } - + const int flags = SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE; + sqlite3* sqlite_handle; + const char* avrodir = config_get_string(service->svc_config_param, "avrodir"); char dbpath[PATH_MAX + 1]; - snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name); + snprintf(dbpath, sizeof(dbpath), "/%s/%s", avrodir, avro_index_name); - if (access(dbpath, W_OK) == 0) + if (sqlite3_open_v2(dbpath, &sqlite_handle, flags, NULL) != SQLITE_OK) { - MXS_NOTICE("[%s] Using existing GTID index: %s", service->name, dbpath); - } - - if (sqlite3_open_v2(dbpath, &inst->sqlite_handle, - SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK) - { - MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, - sqlite3_errmsg(inst->sqlite_handle)); + MXS_ERROR("Failed to open SQLite database '%s': %s", dbpath, sqlite3_errmsg(sqlite_handle)); err = true; } - else if (!create_tables(inst->sqlite_handle)) + else if (!create_tables(sqlite_handle)) { err = true; } if (err) { - sqlite3_close_v2(inst->sqlite_handle); - MXS_FREE(inst->avrodir); - MXS_FREE(inst->binlogdir); - MXS_FREE(inst->fileroot); - delete inst; + sqlite3_close_v2(sqlite_handle); return NULL; } - /* AVRO converter init */ - avro_load_conversion_state(inst); - avro_load_metadata_from_schemas(inst); + return new (std::nothrow) Avro(service, service->svc_config_param, + sqlite_handle, source_service); +} + +Avro::Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERVICE* source): + service(service), + filestem(config_get_string(params, "filestem")), + binlogdir(config_get_string(params, "binlogdir")), + avrodir(config_get_string(params, "avrodir")), + current_pos(4), + binlog_fd(-1), + trx_count(0), + trx_target(config_get_integer(params, "group_trx")), + row_count(0), + row_target(config_get_integer(params, "group_rows")), + block_size(config_get_size(params, "block_size")), + codec(static_cast(config_get_enum(params, "codec", codec_values))), + sqlite_handle(handle), + task_handle(0), + stats{0} +{ + int pcreerr; + size_t erroff; + create_table_re = pcre2_compile((PCRE2_SPTR) create_table_regex, PCRE2_ZERO_TERMINATED, + 0, &pcreerr, &erroff, NULL); + ss_dassert(create_table_re); // This should never fail + alter_table_re = pcre2_compile((PCRE2_SPTR) alter_table_regex, PCRE2_ZERO_TERMINATED, + 0, &pcreerr, &erroff, NULL); + ss_dassert(alter_table_re); // This should never fail + + if (source) + { + read_source_service_options(source); + } + + char filename[BINLOG_FNAMELEN + 1]; + snprintf(filename, sizeof(filename), BINLOG_NAMEFMT, filestem.c_str(), + config_get_integer(params, "start_index")); + binlog_name = filename; + + MXS_NOTICE("Reading MySQL binlog files from %s", binlogdir.c_str()); + MXS_NOTICE("Avro files stored at: %s", avrodir.c_str()); + MXS_NOTICE("First binlog is: %s", binlog_name.c_str()); + + // TODO: Do these in Avro::create + avro_load_conversion_state(this); + avro_load_metadata_from_schemas(this); /* Start the scan, read, convert AVRO task */ - conversion_task_ctl(inst, true); + conversion_task_ctl(this, true); - MXS_INFO("current MySQL binlog file is %s, pos is %lu\n", - inst->binlog_name, inst->current_pos); - return (MXS_ROUTER *) inst; } /** @@ -683,15 +639,13 @@ diagnostics(MXS_ROUTER *router, DCB *dcb) { Avro *router_inst = (Avro *) router; - dcb_printf(dcb, "\tAVRO Converter infofile: %s/%s\n", - router_inst->avrodir, AVRO_PROGRESS_FILE); dcb_printf(dcb, "\tAVRO files directory: %s\n", - router_inst->avrodir); + router_inst->avrodir.c_str()); dcb_printf(dcb, "\tBinlog directory: %s\n", - router_inst->binlogdir); + router_inst->binlogdir.c_str()); dcb_printf(dcb, "\tCurrent binlog file: %s\n", - router_inst->binlog_name); + router_inst->binlog_name.c_str()); dcb_printf(dcb, "\tCurrent binlog position: %lu\n", router_inst->current_pos); dcb_printf(dcb, "\tCurrent GTID value: %lu-%lu-%lu\n", @@ -723,12 +677,12 @@ static json_t* diagnostics_json(const MXS_ROUTER *router) json_t* rval = json_object(); char pathbuf[PATH_MAX + 1]; - snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir, AVRO_PROGRESS_FILE); + snprintf(pathbuf, sizeof(pathbuf), "%s/%s", router_inst->avrodir.c_str(), AVRO_PROGRESS_FILE); json_object_set_new(rval, "infofile", json_string(pathbuf)); - json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir)); - json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir)); - json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name)); + json_object_set_new(rval, "avrodir", json_string(router_inst->avrodir.c_str())); + json_object_set_new(rval, "binlogdir", json_string(router_inst->binlogdir.c_str())); + json_object_set_new(rval, "binlog_name", json_string(router_inst->binlog_name.c_str())); json_object_set_new(rval, "binlog_pos", json_integer(router_inst->current_pos)); snprintf(pathbuf, sizeof(pathbuf), "%lu-%lu-%lu", router_inst->gtid.domain, @@ -798,105 +752,43 @@ bool converter_func(Worker::Call::action_t action, Avro* router) return false; } - bool ok = true; - avro_binlog_end_t binlog_end = AVRO_OK; + bool progress = false; + avro_binlog_end_t binlog_end = AVRO_BINLOG_ERROR; - while (!router->service->svc_do_shutdown && ok && binlog_end == AVRO_OK) + uint64_t start_pos = router->current_pos; + std::string binlog_name = router->binlog_name; + + if (avro_open_binlog(router->binlogdir.c_str(), router->binlog_name.c_str(), &router->binlog_fd)) { - uint64_t start_pos = router->current_pos; - char binlog_name[BINLOG_FNAMELEN + 1]; - strcpy(binlog_name, router->binlog_name); + binlog_end = avro_read_all_events(router); - if (avro_open_binlog(router->binlogdir, router->binlog_name, &router->binlog_fd)) + if (router->current_pos != start_pos || binlog_name != router->binlog_name) { - binlog_end = avro_read_all_events(router); - - if (router->current_pos != start_pos || strcmp(binlog_name, router->binlog_name) != 0) - { - /** We processed some data, reset the conversion task delay */ - router->task_delay = 1; - - /** Update the GTID index */ - avro_update_index(router); - } - - avro_close_binlog(router->binlog_fd); - } - else - { - binlog_end = AVRO_BINLOG_ERROR; + /** Update the GTID index */ + avro_update_index(router); + progress = true; } + + avro_close_binlog(router->binlog_fd); } + static int logged = true; + /** We reached end of file, flush unwritten records to disk */ - if (router->task_delay == 1) + if (progress) { avro_flush_all_tables(router, AVROROUTER_FLUSH); avro_save_conversion_state(router); + logged = false; } - if (binlog_end == AVRO_LAST_FILE) + if (binlog_end == AVRO_LAST_FILE && !logged) { - router->task_delay = MXS_MIN(router->task_delay + 1, AVRO_TASK_DELAY_MAX); - + logged = true; MXS_INFO("Stopped processing file %s at position %lu. Waiting until" - " more data is written before continuing. Next check in %d seconds.", - router->binlog_name, router->current_pos, router->task_delay); + " more data is written before continuing.", + router->binlog_name.c_str(), router->current_pos); } return true; } - -/** - * @brief Ensure directory exists and is writable - * - * TODO: Move this as a function in the core - * - * @param path Path to directory - * @param mode One of O_RDONLY, O_WRONLY or O_RDWR - * @return True if directory exists and can be opened with @p mode permission - */ -static bool ensure_dir_ok(const char* path, int mode) -{ - bool rval = false; - - if (path) - { - char resolved[PATH_MAX + 1]; - const char *rp = realpath(path, resolved); - - if (rp == NULL && errno == ENOENT) - { - rp = path; - } - - if (rp) - { - /** Make sure the directory exists */ - if (mkdir(rp, 0774) == 0 || errno == EEXIST) - { - if (access(rp, mode) == 0) - { - rval = true; - } - else - { - MXS_ERROR("Failed to access directory '%s': %d, %s", rp, - errno, mxs_strerror(errno)); - } - } - else - { - MXS_ERROR("Failed to create directory '%s': %d, %s", rp, - errno, mxs_strerror(errno)); - } - } - else - { - MXS_ERROR("Failed to resolve real path name for '%s': %d, %s", path, - errno, mxs_strerror(errno)); - } - } - - return rval; -} diff --git a/server/modules/routing/avrorouter/avro_client.cc b/server/modules/routing/avrorouter/avro_client.cc index 137b7c2e2..a6aec06c6 100644 --- a/server/modules/routing/avrorouter/avro_client.cc +++ b/server/modules/routing/avrorouter/avro_client.cc @@ -460,7 +460,7 @@ avro_client_process_command(Avro *router, AvroSession *client, GWBUF *queue) memcpy(&client->gtid_start, &client->gtid, sizeof(client->gtid_start)); } - if (file_in_dir(router->avrodir, client->avro_binfile)) + if (file_in_dir(router->avrodir.c_str(), client->avro_binfile)) { /* set callback routine for data sending */ dcb_add_callback(client->dcb, DCB_REASON_DRAINED, avro_client_callback, client); @@ -763,7 +763,7 @@ static bool avro_client_stream_data(AvroSession *client) if (strnlen(client->avro_binfile, 1)) { char filename[PATH_MAX + 1]; - snprintf(filename, PATH_MAX, "%s/%s", router->avrodir, client->avro_binfile); + snprintf(filename, PATH_MAX, "%s/%s", router->avrodir.c_str(), client->avro_binfile); bool ok = true; @@ -976,11 +976,11 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata) switch (client->format) { case AVRO_FORMAT_JSON: - schema = read_avro_json_schema(client->avro_binfile, client->router->avrodir); + schema = read_avro_json_schema(client->avro_binfile, client->router->avrodir.c_str()); break; case AVRO_FORMAT_AVRO: - schema = read_avro_binary_schema(client->avro_binfile, client->router->avrodir); + schema = read_avro_binary_schema(client->avro_binfile, client->router->avrodir.c_str()); break; default: @@ -997,7 +997,7 @@ int avro_client_callback(DCB *dcb, DCB_REASON reason, void *userdata) bool read_more = avro_client_stream_data(client); char filename[PATH_MAX + 1]; - print_next_filename(client->avro_binfile, client->router->avrodir, + print_next_filename(client->avro_binfile, client->router->avrodir.c_str(), filename, sizeof(filename)); bool next_file; @@ -1047,7 +1047,7 @@ AvroSession* AvroSession::create(Avro* inst, MXS_SESSION* session) AvroSession* client = NULL; sqlite3* handle; char dbpath[PATH_MAX + 1]; - snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir, avro_index_name); + snprintf(dbpath, sizeof(dbpath), "/%s/%s", inst->avrodir.c_str(), avro_index_name); if (sqlite3_open_v2(dbpath, &handle, SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE, NULL) != SQLITE_OK) diff --git a/server/modules/routing/avrorouter/avro_file.cc b/server/modules/routing/avrorouter/avro_file.cc index 9073d449c..d5f5c10ee 100644 --- a/server/modules/routing/avrorouter/avro_file.cc +++ b/server/modules/routing/avrorouter/avro_file.cc @@ -165,7 +165,7 @@ bool avro_save_conversion_state(Avro *router) FILE *config_file; char filename[PATH_MAX + 1]; - snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE ".tmp", router->avrodir); + snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE ".tmp", router->avrodir.c_str()); /* open file for writing */ config_file = fopen(filename, "wb"); @@ -181,12 +181,12 @@ bool avro_save_conversion_state(Avro *router) fprintf(config_file, "position=%lu\n", router->current_pos); fprintf(config_file, "gtid=%lu-%lu-%lu:%lu\n", router->gtid.domain, router->gtid.server_id, router->gtid.seq, router->gtid.event_num); - fprintf(config_file, "file=%s\n", router->binlog_name); + fprintf(config_file, "file=%s\n", router->binlog_name.c_str()); fclose(config_file); /* rename tmp file to right filename */ char newname[PATH_MAX + 1]; - snprintf(newname, sizeof(newname), "%s/" AVRO_PROGRESS_FILE, router->avrodir); + snprintf(newname, sizeof(newname), "%s/" AVRO_PROGRESS_FILE, router->avrodir.c_str()); int rc = rename(filename, newname); if (rc == -1) @@ -246,7 +246,7 @@ static int conv_state_handler(void* data, const char* section, const char* key, return 0; } - strcpy(router->binlog_name, value); + router->binlog_name = value; } else { @@ -268,7 +268,7 @@ bool avro_load_conversion_state(Avro *router) char filename[PATH_MAX + 1]; bool rval = false; - snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE, router->avrodir); + snprintf(filename, sizeof(filename), "%s/" AVRO_PROGRESS_FILE, router->avrodir.c_str()); /** No stored state, this is the first time the router is started */ if (access(filename, F_OK) == -1) @@ -285,7 +285,7 @@ bool avro_load_conversion_state(Avro *router) case 0: rval = true; MXS_NOTICE("Loaded stored binary log conversion state: File: [%s] Position: [%ld] GTID: [%lu-%lu-%lu:%lu]", - router->binlog_name, router->current_pos, router->gtid.domain, + router->binlog_name.c_str(), router->current_pos, router->gtid.domain, router->gtid.server_id, router->gtid.seq, router->gtid.event_num); break; @@ -319,12 +319,12 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po { avro_binlog_end_t rval = AVRO_LAST_FILE; - if (binlog_next_file_exists(router->binlogdir, router->binlog_name)) + if (binlog_next_file_exists(router->binlogdir.c_str(), router->binlog_name.c_str())) { char next_binlog[BINLOG_FNAMELEN + 1]; if (snprintf(next_binlog, sizeof(next_binlog), - BINLOG_NAMEFMT, router->fileroot, - blr_file_get_next_binlogname(router->binlog_name)) >= (int)sizeof(next_binlog)) + BINLOG_NAMEFMT, router->filestem.c_str(), + blr_file_get_next_binlogname(router->binlog_name.c_str())) >= (int)sizeof(next_binlog)) { MXS_ERROR("Next binlog name did not fit into the allocated buffer " "but was truncated, aborting: %s", next_binlog); @@ -336,18 +336,17 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po { MXS_NOTICE("End of binlog file [%s] at %lu with a " "close event. Rotating to next binlog file [%s].", - router->binlog_name, pos, next_binlog); + router->binlog_name.c_str(), pos, next_binlog); } else { MXS_NOTICE("End of binlog file [%s] at %lu with no " "close or rotate event. Rotating to next binlog file [%s].", - router->binlog_name, pos, next_binlog); + router->binlog_name.c_str(), pos, next_binlog); } rval = AVRO_OK; - strcpy(router->binlog_name, next_binlog); - router->binlog_position = 4; + router->binlog_name = next_binlog; router->current_pos = 4; } } @@ -355,7 +354,7 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po { MXS_NOTICE("End of binlog file [%s] at %lu with a close event. " "Next binlog file does not exist, pausing file conversion.", - router->binlog_name, pos); + router->binlog_name.c_str(), pos); } return rval; @@ -372,11 +371,9 @@ static avro_binlog_end_t rotate_to_next_file_if_exists(Avro* router, uint64_t po */ static void rotate_to_file(Avro* router, uint64_t pos, const char *next_binlog) { - /** Binlog file is processed, prepare for next one */ MXS_NOTICE("End of binlog file [%s] at %lu. Rotating to file [%s].", - router->binlog_name, pos, next_binlog); - strcpy(router->binlog_name, next_binlog); // next_binlog is as big as router->binlog_name. - router->binlog_position = 4; + router->binlog_name.c_str(), pos, next_binlog); + router->binlog_name = next_binlog; router->current_pos = 4; } @@ -406,7 +403,7 @@ static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos) { MXS_ERROR("Error reading the event at %lu in %s. " "%s, expected %d bytes.", - pos, router->binlog_name, + pos, router->binlog_name.c_str(), mxs_strerror(errno), hdr->event_size - BINLOG_EVENT_HDR_LEN); } @@ -414,7 +411,7 @@ static GWBUF* read_event_data(Avro *router, REP_HEADER* hdr, uint64_t pos) { MXS_ERROR("Short read when reading the event at %lu in %s. " "Expected %d bytes got %d bytes.", - pos, router->binlog_name, + pos, router->binlog_name.c_str(), hdr->event_size - BINLOG_EVENT_HDR_LEN, n); } gwbuf_free(result); @@ -474,7 +471,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { uint8_t hdbuf[BINLOG_EVENT_HDR_LEN]; unsigned long long pos = router->current_pos; - unsigned long long last_known_commit = 4; char next_binlog[BINLOG_FNAMELEN + 1]; REP_HEADER hdr; int pending_transaction = 0; @@ -486,7 +482,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router) if (router->binlog_fd == -1) { - MXS_ERROR("Current binlog file %s is not open", router->binlog_name); + MXS_ERROR("Current binlog file %s is not open", router->binlog_name.c_str()); return AVRO_BINLOG_ERROR; } @@ -503,54 +499,44 @@ avro_binlog_end_t avro_read_all_events(Avro *router) case -1: { MXS_ERROR("Failed to read binlog file %s at position %llu (%s).", - router->binlog_name, pos, + router->binlog_name.c_str(), pos, mxs_strerror(errno)); if (errno == EBADF) MXS_ERROR("Bad file descriptor in read binlog for file %s" ", descriptor %d.", - router->binlog_name, router->binlog_fd); + router->binlog_name.c_str(), router->binlog_fd); break; } default: MXS_ERROR("Short read when reading the header. " "Expected 19 bytes but got %d bytes. " "Binlog file is %s, position %llu", - n, router->binlog_name, pos); + n, router->binlog_name.c_str(), pos); break; } router->current_pos = pos; - if (pending_transaction > 0) + /* any error */ + if (n != 0) { - MXS_ERROR("Binlog '%s' ends at position %lu and has an incomplete transaction at %lu. " - "Stopping file conversion.", router->binlog_name, - router->current_pos, router->binlog_position); - return AVRO_OPEN_TRANSACTION; + return AVRO_BINLOG_ERROR; } else { - /* any error */ - if (n != 0) + do_checkpoint(router, &total_rows, &total_commits); + + MXS_INFO("Processed %lu transactions and %lu row events.", + total_commits, total_rows); + if (rotate_seen) { - return AVRO_BINLOG_ERROR; + rotate_to_file(router, pos, next_binlog); + return AVRO_OK; } else { - do_checkpoint(router, &total_rows, &total_commits); - - MXS_INFO("Processed %lu transactions and %lu row events.", - total_commits, total_rows); - if (rotate_seen) - { - rotate_to_file(router, pos, next_binlog); - return AVRO_OK; - } - else - { - return rotate_to_next_file_if_exists(router, pos, stop_seen); - } + return rotate_to_next_file_if_exists(router, pos, stop_seen); } } } @@ -569,8 +555,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { MXS_ERROR("Invalid MariaDB 10 event type 0x%x. " "Binlog file is %s, position %llu", - hdr.event_type, router->binlog_name, pos); - router->binlog_position = last_known_commit; + hdr.event_type, router->binlog_name.c_str(), pos); router->current_pos = pos; return AVRO_BINLOG_ERROR; } @@ -581,7 +566,6 @@ avro_binlog_end_t avro_read_all_events(Avro *router) "size %d at %llu.", hdr.event_size, pos); - router->binlog_position = last_known_commit; router->current_pos = pos; return AVRO_BINLOG_ERROR; } @@ -590,20 +574,10 @@ avro_binlog_end_t avro_read_all_events(Avro *router) if (result == NULL) { - router->binlog_position = last_known_commit; router->current_pos = pos; - MXS_WARNING("an error has been found. " - "Setting safe pos to %lu, current pos %lu", - router->binlog_position, router->current_pos); return AVRO_BINLOG_ERROR; } - /* check for pending transaction */ - if (pending_transaction == 0) - { - last_known_commit = pos; - } - /* get event content */ ptr = GWBUF_DATA(result); @@ -633,8 +607,8 @@ avro_binlog_end_t avro_read_all_events(Avro *router) { char next_file[BLRM_BINLOG_NAME_STR_LEN + 1]; stop_seen = true; - snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->fileroot, - blr_file_get_next_binlogname(router->binlog_name)); + snprintf(next_file, sizeof(next_file), BINLOG_NAMEFMT, router->filestem.c_str(), + blr_file_get_next_binlogname(router->binlog_name.c_str())); } else if (hdr.event_type == MARIADB_ANNOTATE_ROWS_EVENT) { @@ -730,14 +704,14 @@ avro_binlog_end_t avro_read_all_events(Avro *router) if (hdr.next_pos > 0 && hdr.next_pos < pos) { MXS_INFO("Binlog %s: next pos %u < pos %llu, truncating to %llu", - router->binlog_name, hdr.next_pos, pos, pos); + router->binlog_name.c_str(), hdr.next_pos, pos, pos); break; } if (hdr.next_pos > 0 && hdr.next_pos != (pos + original_size)) { MXS_INFO("Binlog %s: next pos %u != (pos %llu + event_size %u), truncating to %llu", - router->binlog_name, hdr.next_pos, pos, hdr.event_size, pos); + router->binlog_name.c_str(), hdr.next_pos, pos, hdr.event_size, pos); break; } @@ -766,7 +740,7 @@ avro_binlog_end_t avro_read_all_events(Avro *router) void avro_load_metadata_from_schemas(Avro *router) { char path[PATH_MAX + 1]; - snprintf(path, sizeof(path), "%s/*.avsc", router->avrodir); + snprintf(path, sizeof(path), "%s/*.avsc", router->avrodir.c_str()); glob_t files; if (glob(path, 0, NULL, &files) != GLOB_NOMATCH) diff --git a/server/modules/routing/avrorouter/avro_index.cc b/server/modules/routing/avrorouter/avro_index.cc index 7cd99d843..81a2f2cd3 100644 --- a/server/modules/routing/avrorouter/avro_index.cc +++ b/server/modules/routing/avrorouter/avro_index.cc @@ -181,7 +181,7 @@ void avro_index_file(Avro *router, const char* filename) void avro_update_index(Avro* router) { char path[PATH_MAX + 1]; - snprintf(path, sizeof(path), "%s/*.avro", router->avrodir); + snprintf(path, sizeof(path), "%s/*.avro", router->avrodir.c_str()); glob_t files; if (glob(path, 0, NULL, &files) != GLOB_NOMATCH) diff --git a/server/modules/routing/avrorouter/avro_rbr.cc b/server/modules/routing/avrorouter/avro_rbr.cc index ae19ba37b..4e058ce25 100644 --- a/server/modules/routing/avrorouter/avro_rbr.cc +++ b/server/modules/routing/avrorouter/avro_rbr.cc @@ -127,7 +127,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) { char filepath[PATH_MAX + 1]; snprintf(filepath, sizeof(filepath), "%s/%s.%06d.avro", - router->avrodir, table_ident, map->version); + router->avrodir.c_str(), table_ident, map->version); SAvroTable avro_table(avro_table_alloc(filepath, json_schema, codec_to_string(router->codec), @@ -145,7 +145,7 @@ bool handle_table_map_event(Avro *router, REP_HEADER *hdr, uint8_t *ptr) router->table_maps[table_ident] = map; router->open_tables[table_ident] = avro_table; - save_avro_schema(router->avrodir, json_schema, map.get()); + save_avro_schema(router->avrodir.c_str(), json_schema, map.get()); router->active_maps[map->id] = map; ss_dassert(router->active_maps[id] == map); MXS_DEBUG("Table %s mapped to %lu", table_ident, map->id); diff --git a/server/modules/routing/avrorouter/avrorouter.hh b/server/modules/routing/avrorouter/avrorouter.hh index 1bb564128..6e2ac6868 100644 --- a/server/modules/routing/avrorouter/avrorouter.hh +++ b/server/modules/routing/avrorouter/avrorouter.hh @@ -185,14 +185,6 @@ struct TABLE_MAP char* database; }; -/** - * The statistics for this AVRO router instance - */ -typedef struct -{ - int n_clients; /*< Number client sessions created */ -} AVRO_ROUTER_STATS; - struct AVRO_TABLE { ~AVRO_TABLE() @@ -257,18 +249,19 @@ typedef std::tr1::unordered_map AvroTables; typedef std::tr1::unordered_map MappedTables; typedef std::tr1::unordered_map ActiveMaps; -struct Avro +class Avro: public MXS_ROUTER { + Avro(const Avro&) = delete; + Avro& operator=(const Avro&) = delete; + +public: + static Avro* create(SERVICE* service); + SERVICE* service; /*< Pointer to the service using this router */ - int initbinlog; /*< Initial binlog file number */ - char* fileroot; /*< Root of binlog filename */ - unsigned int state; /*< State of the AVRO router */ - uint8_t lastEventReceived; /*< Last even received */ - uint32_t lastEventTimestamp; /*< Timestamp from last event */ - char* binlogdir; /*< The directory where the binlog files are stored */ - char* avrodir; /*< The directory with the AVRO files */ - char binlog_name[BINLOG_FNAMELEN + 1]; /*< Name of the current binlog file */ - uint64_t binlog_position; /*< last committed transaction position */ + std::string filestem; /*< Root of binlog filename */ + std::string binlogdir; /*< The directory where the binlog files are stored */ + std::string avrodir; /*< The directory with the AVRO files */ + std::string binlog_name; /*< Name of the current binlog file */ uint64_t current_pos; /*< Current binlog position */ int binlog_fd; /*< File descriptor of the binlog file being read */ pcre2_code* create_table_re; @@ -281,11 +274,6 @@ struct Avro MappedTables table_maps; AvroTables open_tables; CreatedTables created_tables; - sqlite3* sqlite_handle; - char prevbinlog[BINLOG_FNAMELEN + 1]; - int rotating; /*< Rotation in progress flag */ - AVRO_ROUTER_STATS stats; /*< Statistics for this router */ - int task_delay; /*< Delay in seconds until the next conversion takes place */ uint64_t trx_count; /*< Transactions processed */ uint64_t trx_target; /*< Minimum about of transactions that will trigger * a flush of all tables */ @@ -294,7 +282,17 @@ struct Avro * a flush of all tables */ uint64_t block_size; /**< Avro datablock size */ enum mxs_avro_codec_type codec; /**< Avro codec type, defaults to `null` */ + sqlite3* sqlite_handle; uint32_t task_handle; /**< Delayed task handle */ + + struct + { + int n_clients; /*< Number client sessions created */ + } stats; /*< Statistics for this router */ + +private: + Avro(SERVICE* service, MXS_CONFIG_PARAMETER* params, sqlite3* handle, SERVICE* source); + void read_source_service_options(SERVICE* source); }; class AvroSession: public MXS_ROUTER_SESSION